Application Objects
These object classes manage communication between a simulator and broker using the MQTT messaging protocol.
Utilities
Utilities monitor and report on application connections, modes, and time statuses.
- class ConnectionConfig(username: str = None, password: str = None, host: str = None, rabbitmq_port: int = None, keycloak_port: int = None, keycloak_realm: str = None, client_id: str = None, client_secret_key: str = None, virtual_host: str = None, is_tls: bool = True, env_file: str = None, yaml_file: str = None)
Bases:
objectConnection configuration.
The configuration settings to establish a connection to the broker, including authentication for the user and identification of the server.
- __init__(username: str = None, password: str = None, host: str = None, rabbitmq_port: int = None, keycloak_port: int = None, keycloak_realm: str = None, client_id: str = None, client_secret_key: str = None, virtual_host: str = None, is_tls: bool = True, env_file: str = None, yaml_file: str = None)
Initializes a new connection configuration.
- Parameters:
username (str) – client username, provided by NOS-T operator
password (str) – client password, provided by NOS-T operator
host (str) – broker hostname
rabbitmq_port (int) – RabbitMQ broker port number
keycloak_port (int) – Keycloak IAM port number
keycloak_realm (str) – Keycloak realm name
client_id (str) – Keycloak client ID
client_secret_key (str) – Keycloak client secret key
virtual_host (str) – RabbitMQ virtual host
is_tls (bool) – True, if the connection uses TLS
env_file (str) – Path to the .env file
yaml_file (str) – Path to the YAML configuration file
- class ShutDownObserver(app: Application)
Bases:
ObserverObserver that shuts down an application after scenario termination.
- app
application to be shut down after termination
- Type:
Application
- __init__(app: Application)
Initializes a new shut down observer.
- Parameters:
app (
Application) – application to be shut down after termination
- class TimeStatusPublisher(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)
Bases:
ScenarioTimeIntervalPublisherPublishes time status messages for an application at a regular interval.
- app
application to publish time status messages
- Type:
Application
- class ModeStatusObserver(app: Application)
Bases:
ObserverObserver that publishes mode status messages for an application.
- app
application to publish mode status messages
- Type:
Application
- __init__(app: Application)
Initializes a new mode status observer.
- on_change(source: object, property_name: str, old_value: object, new_value: object) None
Publishes a mode status message in response to a mode transition.
- stop_application()
Stops the application by closing the channel and connection if they are open.
Publishers
Publishers define patterns for publishing messages on regularly spaced time-intervals, which are useful for updating states or simple time-status messages.
- class ScenarioTimeIntervalPublisher(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)
-
Publishes messages at a regular interval (scenario time).
Provides the simulation with time status messages, also refered to as ‘heartbeat messages’, or ‘simulation time statuses’.
- app
application to publish messages
- Type:
Application
- time_status_step
scenario duration between time status messages
- Type:
timedelta
- __init__(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)
Initializes a new scenario time interval publisher.
- Parameters:
app (
Application) – application to publish messagestime_status_step (
timedelta) – scenario duration between time status messagestime_status_init (
datetime) – scenario time for first time status message
- class WallclockTimeIntervalPublisher(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)
-
Publishes messages at a regular interval (wallclock time).
- app
application to publish messages
- Type:
Application
- time_status_step
wallclock duration between time status messages
- Type:
timedelta
- __init__(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)
Initializes a new wallclock time interval publisher.
- Parameters:
app (
Application) – application to publish messagestime_status_step (
timedelta) – wallclock duration between time status messagestime_status_init (
datetime) – wallclock time for first time status message
- on_change(source, property_name, old_value, new_value)
Publishes a message after a designated interval of scenario time.
Applications
These applications serve as templates or wrappers of basic MQTT client functionality and synchronization for simulation.
- class Application(app_name: str, app_description: str = None)
Bases:
objectBase class for a member application.
This object class defines the main functionality of a NOS-T application which can be modified for user needs.
- simulator
Application simulator – calls on the simulator.py class for functionality
- Type:
Simulator
- client
Application MQTT client
- Type:
Client
- time_status_step
Scenario duration between time status messages
- Type:
timedelta
- acknowledge_message(delivery_tag)
Acknowledge the message delivery from RabbitMQ by sending a Basic.Ack RPC method for the delivery tag.
- Parameters:
delivery_tag (str) – The delivery tag of the message to acknowledge
- add_message_callback(app_name: str, app_topic: str, user_callback: Callable)
Add callback for a topic, supporting wildcards (* and #) in routing keys. (* matches exactly one word, # matches zero or more words)
- add_on_channel_close_callback()
This method tells pika to call the on_channel_closed method if RabbitMQ unexpectedly closes the channel.
- close_channel()
Call to close the channel with RabbitMQ cleanly by issuing the Channel.Close RPC command.
- close_connection()
This method is invoked by pika when the connection to RabbitMQ is closed. This method is called when the application is shutting down or when the connection is closed unexpectedly.
- create_routing_key(app_name: str, topic: str)
Creates a routing key for the application. The routing key is used to bind the queue to the exchange.
- delete_exchange(unique_exchanges)
Deletes the exchanges from RabbitMQ.
- Parameters:
unique_exchanges (dict) – dictionary of unique exchanges
- delete_queue(configs, app_name)
Deletes the queues from RabbitMQ.
- new_access_token(refresh_token=None)
Obtains a new access token and refresh token from Keycloak. If a refresh token is provided, the access token is refreshed using the refresh token. Otherwise, the access token is obtained using the username and password provided in the configuration.
- Parameters:
refresh_token (str) – refresh token (optional)
- on_cancelok(_unused_frame, userdata)
This method is invoked by pika when RabbitMQ acknowledges the cancellation of a consumer. At this point we will close the channel. This will invoke the on_channel_closed method once the channel has been closed, which will in-turn close the connection.
- Parameters:
_unused_frame (
pika.frame.Method) – The Basic.CancelOk frameuserdata (str|unicode) – Extra user data (consumer tag)
- on_channel_closed(channel, reason)
Invoked by pika when RabbitMQ unexpectedly closes the channel. Determines whether to close the connection or just prepare for reconnection.
- Parameters:
channel (
pika.channel.Channel) – channel objectreason (Exception) – exception representing reason for channel closure
- on_channel_open(channel)
Callback function for when the channel is opened.
- Parameters:
channel (
pika.channel.Channel) – channel object
- on_connection_closed(connection, reason)
Invoked by pika when RabbitMQ unexpectedly closes the connection. Determines whether to close the connection or just prepare for reconnection.
- Parameters:
connection (
pika.connection.Connection) – connection objectreason (Exception) – exception representing reason for loss of connection
- on_connection_error(connection, error)
Callback function for when a connection error occurs.
- Parameters:
connection (
pika.connection.Connection) – connection objecterror (Exception) – exception representing reason for loss of connection
- on_connection_open(connection)
This method is invoked by pika when the connection to RabbitMQ has been established. At this point we can create a channel and start consuming messages.
- Parameters:
connection (
pika.connection.Connection) – connection object
- ready() None
Signals the application is ready to initialize scenario execution. Publishes a
ReadyStatusmessage to the topic prefix.app_name.status.ready.
- reconnect()
Reconnect to RabbitMQ by reinitializing the connection with refreshed credentials.
- routing_key_matches_pattern(routing_key, pattern)
Check if a routing key matches a wildcard pattern.
- send_message(app_name, app_topics, payload: str) None
Sends a message to the broker. If the connection is down, the message is queued for later delivery when the connection is restored.
- set_wallclock_offset(host='pool.ntp.org', retry_delay_s: int = 5, max_retry: int = 5) None
Issues a Network Time Protocol (NTP) request to determine the system clock offset.
- shut_down() None
Shuts down the application by stopping the background event loop and disconnecting from the broker.
- start_token_refresh_thread()
Starts a background thread to refresh the access token periodically.
- start_up(prefix: str, config: ConnectionConfig, set_offset: bool = None, time_status_step: timedelta = None, time_status_init: datetime = None, shut_down_when_terminated: bool = None) None
Starts up the application to prepare for scenario execution. Connects to the message broker and starts a background event loop by establishing the simulation prefix, the connection configuration, and the intervals for publishing time status messages.
- Parameters:
prefix (str) – messaging namespace (prefix)
config (
ConnectionConfig) – connection configurationset_offset (bool) – True, if the system clock offset shall be set using a NTP request prior to execution
time_status_step (
timedelta) – scenario duration between time status messagestime_status_init (
datetime) – scenario time for first time status messageshut_down_when_terminated (bool) – True, if the application should shut down when the simulation is terminated
- stop_application()
Cleanly shutdown the connection to RabbitMQ by stopping the consumer with RabbitMQ, cleaning up resources, and stopping all background threads.
- stop_consuming()
Tell RabbitMQ that you would like to stop consuming by sending the Basic.Cancel RPC command.
- stop_loop()
Stop the IO loop
- update_connection_credentials(access_token)
Updates the connection credentials with the new access token.
- Parameters:
access_token (str) – new access token
- class ManagedApplication(app_name: str, app_description: str = None)
Bases:
ApplicationManaged NOS-T Application.
This object class defines the basic functionality for a NOS-T application that utilizes an external Manager to command simulator execution.
- simulator
simulator
- Type:
Simulator
- client
MQTT client
- Type:
Client
- time_status_step
scenario duration between time status messages
- Type:
timedelta
- time_step
scenario time step used in execution
- Type:
timedelta
- on_manager_init(ch, method, properties, body) None
Callback function for the managed application to respond to an initilize command sent from the manager. Parses the scenario start/end times and signals ready.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_manager_start(ch, method, properties, body) None
Callback function for the managed application to respond to a start command sent from the manager. Parses the scenario start/end time, wallclock epoch, and time scale factor and executes the simulator in a background thread.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_manager_stop(ch, method, properties, body) None
Callback function for the managed application (‘self’) to respond to a stop command sent from the manager. Parses the end time and updates the simulator.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_manager_update(ch, method, properties, body) None
Callback function for the managed application (‘self’) to respond to an update command sent from the manager. Parses the time scaling factor and scenario update time and updates the simulator.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- shut_down() None
Shuts down the application by stopping the background event loop and disconnecting the application from the broker.
- start_up(prefix: str, config: ConnectionConfig, set_offset: bool = None, time_status_step: timedelta = None, time_status_init: datetime = None, shut_down_when_terminated: bool = None, time_step: timedelta = None, manager_app_name: str = None) None
Starts up the application by connecting to message broker, starting a background event loop, subscribing to manager events, and registering callback functions.
- Parameters:
prefix (str) – execution namespace (prefix)
config (
ConnectionConfig) – connection configurationset_offset (bool) – True, if the system clock offset shall be set using a NTP request prior to execution
time_status_step (
timedelta) – scenario duration between time status messagestime_status_init (
datetime) – scenario time for first time status messageshut_down_when_terminated (bool) – True, if the application should shut down when the simulation is terminated
time_step (
timedelta) – scenario time step used in execution (Default: 1 second)manager_app_name (str) – manager application name (Default: manager)
- class TimeScaleUpdate(time_scale_factor: float, sim_update_time: datetime)
Bases:
objectProvides a scheduled update to the simulation time scale factor by sending a message at the designated sim_update_time to change the time_scale_factor to the indicated value.
- class Manager
Bases:
ApplicationNOS-T Manager Application.
This object class defines a manager to orchestrate test run executions.
- simulator
Application simulator
- Type:
Simulator
- client
Application MQTT client
- Type:
Client
- time_step
Scenario time step used in execution
- Type:
timedelta
- time_status_step
Scenario duration between time status messages
- Type:
timedelta
- __init__()
Initializes a new manager.
- establish_exchange()
Establishes the exchange for the manager application.
- execute_test_plan(*args, **kwargs) None
Starts the test plan execution in a background thread.
- Parameters:
*args – Positional arguments to be passed to the test plan execution.
**kwargs – Keyword arguments to be passed to the test plan execution.
- init(sim_start_time: datetime, sim_stop_time: datetime, required_apps: List[str] = []) None
Publishes an initialize command to initialize a test run execution.
- on_app_ready_status(ch, method, properties, body) None
Callback to handle a message containing an application ready status.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_app_time_status(ch, method, properties, body) None
Callback to handle a message containing an application time status.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- start(sim_start_time: datetime, sim_stop_time: datetime, start_time: datetime = None, time_step: timedelta = datetime.timedelta(seconds=1), time_scale_factor: float = 1.0, time_status_step: timedelta = None, time_status_init: datetime = None) None
Command to start a test run execution by starting the simulator execution with all necessary parameters and publishing a start command, which can be received by the connected applications.
- Parameters:
sim_start_time (
datetime) – Scenario time at which to start executionsim_stop_time (
datetime) – Scenario time at which to stop executionstart_time (
datetime) – Wallclock time at which to start execution (default: now)time_step (
timedelta) – Scenario time step used in execution (default: 1 second)time_scale_factor (float) – Scenario seconds per wallclock second (default: 1.0)
time_status_step (
timedelta) – Scenario duration between time status messagestime_status_init (
datetime) – Scenario time of first time status message
- start_up(prefix: str, config: ConnectionConfig, set_offset: bool = None, time_status_step: timedelta = None, time_status_init: datetime = None, shut_down_when_terminated: bool = None, time_step: timedelta = None, manager_app_name: str = None) None
Starts up the application by connecting to message broker, starting a background event loop, subscribing to manager events, and registering callback functions.
- Parameters:
prefix (str) – execution namespace (prefix)
config (
ConnectionConfig) – connection configurationset_offset (bool) – True, if the system clock offset shall be set using a NTP request prior to execution
time_status_step (
timedelta) – scenario duration between time status messagestime_status_init (
datetime) – scenario time for first time status messageshut_down_when_terminated (bool) – True, if the application should shut down when the simulation is terminated
time_step (
timedelta) – scenario time step used in execution (Default: 1 second)manager_app_name (str) – manager application name (Default: manager)
- stop(sim_stop_time: datetime) None
Command to stop a test run execution by updating the execution end time and publishing a stop command.
- Parameters:
sim_stop_time (
datetime) – Scenario time at which to stop execution.
- class LoggerApplication(app_name: str = 'logger', app_description: str = None)
Bases:
ApplicationLogger NOS-T Application.
This object class defines the basic functionality for a NOS-T application that subscribes to a specified topic and logs all messages to file.
- simulator
Application simulator defined in the Simulator class
- Type:
Simulator
- log_file
Current log file
- Type:
File
- __init__(app_name: str = 'logger', app_description: str = None)
Initializes a new logging application.
- on_log_message(ch, method, properties, body)
Callback function to log a message received by the logger application.
- Parameters:
ch – The channel object
method – The method frame
properties – The message properties
body – The message body
- shut_down() None
Shuts down the application by stopping the background event loop and disconnecting from the message broker.
- start_up(prefix: str, config: ConnectionConfig, set_offset: bool = None, time_status_step: timedelta = None, time_status_init: datetime = None, shut_down_when_terminated: bool = None, time_step: timedelta = None, manager_app_name: str = None, log_app: str = '+', log_topic: str = '#', log_dir: str = '.') None
Starts up the logger application by connecting to message broker, starting a background event loop, subscribing to manager events, and registering callback functions.
- Parameters:
prefix (str) – The test run namespace (prefix)
config (
ConnectionConfig) – The connection configurationset_offset (bool) – True, if the system clock offset shall be set
time_status_step (
timedelta) – Time interval for status messagestime_status_init (
datetime) – Initial time for status messagesshut_down_when_terminated (bool) – True, if the application shall shut down when terminated
time_step (
timedelta) – Time step for the applicationmanager_app_name (str) – Manager application name
log_app (str) – Application name to be logged (default: “+”)
log_topic (str) – Topic to be logged (default: “#”)
log_dir (str) – Directory to write log files (default: “.”)