Application Objects
These object classes manage communication between a simulator and broker using the AMQP messaging protocol.
Utilities
Utilities monitor and report on application connections, modes, and time statuses.
- class ConnectionConfig(username: str = None, password: str = None, host: str = None, rabbitmq_port: int = None, keycloak_port: int = None, keycloak_realm: str = None, client_id: str = None, client_secret_key: str = None, virtual_host: str = None, is_tls: bool = True, env_file: str = None, yaml_file: str = None)
Bases:
objectConnection configuration.
The configuration settings to establish a connection to the broker, including authentication for the user and identification of the server.
- __init__(username: str = None, password: str = None, host: str = None, rabbitmq_port: int = None, keycloak_port: int = None, keycloak_realm: str = None, client_id: str = None, client_secret_key: str = None, virtual_host: str = None, is_tls: bool = True, env_file: str = None, yaml_file: str = None)
Initializes a new connection configuration.
- Parameters:
username (str) – client username, provided by NOS-T operator
password (str) – client password, provided by NOS-T operator
host (str) – broker hostname
rabbitmq_port (int) – RabbitMQ broker port number
keycloak_port (int) – Keycloak IAM port number
keycloak_realm (str) – Keycloak realm name
client_id (str) – Keycloak client ID
client_secret_key (str) – Keycloak client secret key
virtual_host (str) – RabbitMQ virtual host
is_tls (bool) – True, if the connection uses TLS
env_file (str) – Path to the .env file
yaml_file (str) – Path to the YAML configuration file
- class ShutDownObserver(app: Application)
Bases:
ObserverObserver that shuts down an application after scenario termination.
- app
application to be shut down after termination
- Type:
Application
- __init__(app: Application)
Initializes a new shut down observer.
- Parameters:
app (
Application) – application to be shut down after termination
- class TimeStatusPublisher(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)
Bases:
ScenarioTimeIntervalPublisherPublishes time status messages for an application at a regular interval.
- app
application to publish time status messages
- Type:
Application
- class ModeStatusObserver(app: Application)
Bases:
ObserverObserver that publishes mode status messages for an application.
- app
application to publish mode status messages
- Type:
Application
- __init__(app: Application)
Initializes a new mode status observer.
- on_change(source: object, property_name: str, old_value: object, new_value: object) None
Publishes a mode status message in response to a mode transition.
- stop_application()
Stops the application by closing the channel and connection if they are open.
Publishers
Publishers define patterns for publishing messages on regularly spaced time-intervals, which are useful for updating states or simple time-status messages.
- class ScenarioTimeIntervalPublisher(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)
-
Publishes messages at a regular interval (scenario time).
Provides the simulation with time status messages, also refered to as ‘heartbeat messages’, or ‘simulation time statuses’.
- app
application to publish messages
- Type:
Application
- time_status_step
scenario duration between time status messages
- Type:
timedelta
- __init__(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)
Initializes a new scenario time interval publisher.
- Parameters:
app (
Application) – application to publish messagestime_status_step (
timedelta) – scenario duration between time status messagestime_status_init (
datetime) – scenario time for first time status message
- class WallclockTimeIntervalPublisher(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)
-
Publishes messages at a regular interval (wallclock time).
- app
application to publish messages
- Type:
Application
- time_status_step
wallclock duration between time status messages
- Type:
timedelta
- __init__(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)
Initializes a new wallclock time interval publisher.
- Parameters:
app (
Application) – application to publish messagestime_status_step (
timedelta) – wallclock duration between time status messagestime_status_init (
datetime) – wallclock time for first time status message
- on_change(source, property_name, old_value, new_value)
Publishes a message after a designated interval of scenario time.
Applications
These applications serve as templates or wrappers of basic AMQP client functionality and synchronization for simulation.
- class Application(app_name: str, app_description: str = None, setup_signal_handlers: bool = True)
Bases:
objectBase class for a member application.
This object class defines the main functionality of a NOS-T application which can be modified for user needs.
- simulator
Application simulator – calls on the simulator.py class for functionality
- Type:
Simulator
- client
Application MQTT client
- Type:
Client
- time_status_step
Scenario duration between time status messages
- Type:
timedelta
- __init__(app_name: str, app_description: str = None, setup_signal_handlers: bool = True)
Initializes a new application.
- acknowledge_message(delivery_tag)
Acknowledge the message delivery from RabbitMQ by sending a Basic.Ack RPC method for the delivery tag.
- Parameters:
delivery_tag (str) – The delivery tag of the message to acknowledge
- add_message_callback(app_name: str, app_topic: str, user_callback: Callable)
Add callback for a topic, supporting wildcards (* and #) in routing keys. (* matches exactly one word, # matches zero or more words)
- add_on_channel_close_callback()
This method tells pika to call the on_channel_closed method if RabbitMQ unexpectedly closes the channel.
- close_channel()
Call to close the channel with RabbitMQ cleanly by issuing the Channel.Close RPC command.
- close_connection()
This method is invoked by pika when the connection to RabbitMQ is closed. This method is called when the application is shutting down or when the connection is closed unexpectedly.
- configure_file_logging(log_dir: str = None, log_filename: str = None, log_level: str = None, max_bytes: int = None, backup_count: int = None, log_format: str = None)
Configures file logging for the application.
- Parameters:
log_dir (str) – Directory where log files will be stored
log_filename (str) – Name of the log file. If None, a timestamped filename will be used
log_level (str) – Logging level (e.g., ‘DEBUG’, ‘INFO’, ‘WARNING’, ‘ERROR’, ‘CRITICAL’)
max_bytes (int) – Maximum file size in bytes before rotating
backup_count (int) – Number of backup files to keep
log_format (str) – Log message format
- create_routing_key(app_name: str, topic: str)
Creates a routing key for the application. The routing key is used to bind the queue to the exchange.
- delete_exchange(unique_exchanges)
Deletes the exchanges from RabbitMQ.
- Parameters:
unique_exchanges (dict) – dictionary of unique exchanges
- delete_queue(configs, app_name)
Deletes the queues from RabbitMQ.
- establish_exchange()
Establishes the exchange for the application.
- new_access_token(refresh_token=None, otp=None)
Obtains a new access token and refresh token from Keycloak. If a refresh token is provided, the access token is refreshed using the refresh token. Otherwise, the access token is obtained using either: - User authentication: username and password (with optional OTP) - Service account: client credentials only
The authentication mode is automatically detected based on whether username/password are provided.
OTP/TOTP Handling: - If OTP is required but not provided, the user will be prompted to enter it interactively - OTP requirement is intelligently detected from Keycloak error responses - If credentials are incorrect, a clear error is raised without prompting for OTP
- on_cancelok(_unused_frame, userdata)
This method is invoked by pika when RabbitMQ acknowledges the cancellation of a consumer. At this point we will close the channel. This will invoke the on_channel_closed method once the channel has been closed, which will in-turn close the connection.
- Parameters:
_unused_frame (
pika.frame.Method) – The Basic.CancelOk frameuserdata (str|unicode) – Extra user data (consumer tag)
- on_channel_closed(channel, reason)
Invoked by pika when RabbitMQ unexpectedly closes the channel. Determines whether to close the connection or just prepare for reconnection.
- Parameters:
channel (
pika.channel.Channel) – channel objectreason (Exception) – exception representing reason for channel closure
- on_channel_open(channel)
Callback function for when the channel is opened.
- Parameters:
channel (
pika.channel.Channel) – channel object
- on_connection_closed(connection, reason)
Invoked by pika when RabbitMQ unexpectedly closes the connection. Determines whether to close the connection or just prepare for reconnection.
- Parameters:
connection (
pika.connection.Connection) – connection objectreason (Exception) – exception representing reason for loss of connection
- on_connection_error(connection, error)
Callback function for when a connection error occurs.
- Parameters:
connection (
pika.connection.Connection) – connection objecterror (Exception) – exception representing reason for loss of connection
- on_connection_open(connection)
This method is invoked by pika when the connection to RabbitMQ has been established. At this point we can create a channel and start consuming messages.
- Parameters:
connection (
pika.connection.Connection) – connection object
- ready() None
Signals the application is ready to initialize scenario execution. Publishes a
ReadyStatusmessage to the topic prefix.app_name.status.ready.
- reconnect()
Reconnect to RabbitMQ by reinitializing the connection with refreshed credentials.
- request_resume(sim_resume_time: datetime = None, tolerance: timedelta = None) None
Request a resume from the manager with optional scenario time and tolerance.
- Parameters:
sim_resume_time (
datetime, optional) – Scenario time at which to resume. If None, resume immediately.tolerance (
timedelta, optional) – Time tolerance for resume. If not provided, uses default from configuration_parameters[‘resume_tolerance’] if available. If tolerance is None (not provided and not in config), resume immediately.
- routing_key_matches_pattern(routing_key, pattern)
Check if a routing key matches a wildcard pattern.
- send_message(app_name, app_topics, payload: str) None
Sends a message to the broker. If the connection is down, the message is queued for later delivery when the connection is restored.
- shut_down() None
Shuts down the application by stopping the background event loop and disconnecting from the broker.
- start_token_refresh_thread()
Starts a background thread to refresh the access token periodically.
- start_up(prefix: str, config: ConnectionConfig, set_offset: bool = True, time_status_step: timedelta = None, time_status_init: datetime = None, shut_down_when_terminated: bool = False, access_token: str = None, refresh_token: str = None) None
Starts up the application to prepare for scenario execution. Connects to the message broker and starts a background event loop by establishing the simulation prefix, the connection configuration, and the intervals for publishing time status messages.
Three Keycloak authentication modes are supported: 1. User authentication: username + password in ConnectionConfig credentials. 2. Service account: client_id + client_secret_key in ConnectionConfig credentials. 3. Pre-acquired token: access_token + refresh_token passed here directly.
Use this mode when tokens were obtained elsewhere (e.g., forwarded from a frontend login). The refresh thread will keep the session alive by calling the Keycloak refresh endpoint using the configured client_id. For public clients, no client_secret_key is required.
- Parameters:
prefix (str) – messaging namespace (prefix)
config (
ConnectionConfig) – connection configurationset_offset (bool) – True, if the system clock offset shall be set using a NTP request prior to execution
time_status_step (
timedelta) – scenario duration between time status messagestime_status_init (
datetime) – scenario time for first time status messageshut_down_when_terminated (bool) – True, if the application should shut down when the simulation is terminated
access_token (str) – pre-acquired Keycloak access token (optional; requires refresh_token)
refresh_token (str) – pre-acquired Keycloak refresh token (required when access_token is provided)
- start_wallclock_refresh_thread()
Starts a background thread to refresh the wallclock offset periodically.
- stop_application()
Cleanly shutdown the connection to RabbitMQ by stopping the consumer with RabbitMQ, cleaning up resources, and stopping all background threads.
- stop_consuming()
Tell RabbitMQ that you would like to stop consuming by sending the Basic.Cancel RPC command.
- stop_loop()
Stop the IO loop
- update_connection_credentials(access_token)
Updates the connection credentials with the new access token.
- Parameters:
access_token (str) – new access token
- class ManagedApplication(app_name: str, app_description: str = None, setup_signal_handlers: bool = True)
Bases:
ApplicationManaged NOS-T Application.
This object class defines the basic functionality for a NOS-T application that utilizes an external Manager to command simulator execution.
- simulator
simulator
- Type:
Simulator
- client
MQTT client
- Type:
Client
- time_status_step
scenario duration between time status messages
- Type:
timedelta
- time_step
scenario time step used in execution
- Type:
timedelta
- __init__(app_name: str, app_description: str = None, setup_signal_handlers: bool = True)
Initializes a new managed application.
- on_manager_freeze(ch, method, properties, body) None
Callback function for the managed application (‘self’) to respond to a freeze command sent from the manager. Parses the freeze duration and simulation freeze time and updates the simulator.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_manager_init(ch, method, properties, body) None
Callback function for the managed application to respond to an initilize command sent from the manager. Parses the scenario start/end times and signals ready.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_manager_resume(ch, method, properties, body) None
Callback function for the managed application (‘self’) to respond to a resume command sent from the manager. Resumes the simulator execution.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_manager_start(ch, method, properties, body) None
Callback function for the managed application to respond to a start command sent from the manager. Parses the scenario start/end time, wallclock epoch, and time scale factor and executes the simulator in a background thread.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_manager_stop(ch, method, properties, body) None
Callback function for the managed application (‘self’) to respond to a stop command sent from the manager. Parses the end time and updates the simulator.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_manager_update(ch, method, properties, body) None
Callback function for the managed application (‘self’) to respond to an update command sent from the manager. Parses the time scaling factor and scenario update time and updates the simulator.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- request_freeze(freeze_duration: timedelta = None, sim_freeze_time: datetime = None) None
Request a freeze from the manager.
- Parameters:
freeze_duration (
timedelta, optional) – Duration for which to freeze execution. If None, creates an indefinite freeze.sim_freeze_time (
datetime, optional) – Scenario time at which to freeze execution. If None, freezes immediately.
- request_update(time_scale_factor: float, sim_update_time: datetime = None) None
Request a time scale factor update from the manager.
- shut_down() None
Shuts down the application by stopping the background event loop and disconnecting the application from the broker.
- start_up(prefix: str, config: ConnectionConfig, set_offset: bool = True, time_status_step: timedelta = None, time_status_init: datetime = None, shut_down_when_terminated: bool = False, time_step: timedelta = None, manager_app_name: str = None, access_token: str = None, refresh_token: str = None) None
Starts up the application by connecting to message broker, starting a background event loop, subscribing to manager events, and registering callback functions.
- Parameters:
prefix (str) – execution namespace (prefix)
config (
ConnectionConfig) – connection configurationset_offset (bool) – True, if the system clock offset shall be set using a NTP request prior to execution
time_status_step (
timedelta) – scenario duration between time status messagestime_status_init (
datetime) – scenario time for first time status messageshut_down_when_terminated (bool) – True, if the application should shut down when the simulation is terminated
time_step (
timedelta) – scenario time step used in execution (Default: 1 second)manager_app_name (str) – manager application name (Default: manager)
access_token (str) – pre-acquired Keycloak access token (optional; requires refresh_token)
refresh_token (str) – pre-acquired Keycloak refresh token (required when access_token is provided)
- class Manager(app_name: str = 'manager', app_description: str = None, setup_signal_handlers: bool = True)
Bases:
ApplicationNOS-T Manager Application.
This object class defines a manager to orchestrate test run executions.
- simulator
Application simulator
- Type:
Simulator
- client
Application MQTT client
- Type:
Client
- time_step
Scenario time step used in execution
- Type:
timedelta
- time_status_step
Scenario duration between time status messages
- Type:
timedelta
- __init__(app_name: str = 'manager', app_description: str = None, setup_signal_handlers: bool = True)
Initializes a new manager.
- execute_test_plan(*args, **kwargs) None
Starts the test plan execution in a background thread.
- Parameters:
*args – Positional arguments to be passed to the test plan execution.
**kwargs – Keyword arguments to be passed to the test plan execution.
- freeze(freeze_duration: timedelta = None, sim_freeze_time: datetime = None, resume_time: datetime = None) None
Command to freeze a test run execution by updating the execution freeze duration and publishing a freeze command.
- Parameters:
freeze_duration (
timedelta, optional) – Duration for which to freeze execution. If None, creates an indefinite freeze.sim_freeze_time (
datetime, optional) – Scenario time at which to freeze execution. If None, freezes immediately.
- init(sim_start_time: datetime, sim_stop_time: datetime, required_apps: List[str] = []) None
Publishes an initialize command to initialize a test run execution.
- on_app_ready_status(ch, method, properties, body) None
Callback to handle a message containing an application ready status.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_app_time_status(ch, method, properties, body) None
Callback to handle a message containing an application time status.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_freeze_request(ch, method, properties, body) None
Callback to handle freeze requests from managed applications.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_resume_request(ch, method, properties, body) None
Callback to handle resume requests from managed applications.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- on_update_request(ch, method, properties, body) None
Callback to handle update requests from managed applications.
- Parameters:
ch (
pika.channel.Channel) – The channel object used to communicate with the RabbitMQ server.method (
pika.spec.Basic.Deliver) – Delivery-related information such as delivery tag, exchange, and routing key.properties (
pika.BasicProperties) – Message properties including content type, headers, and more.body (bytes) – The actual message body sent, containing the message payload.
- start(sim_start_time: datetime, sim_stop_time: datetime, start_time: datetime = None, time_step: timedelta = datetime.timedelta(seconds=1), time_scale_factor: float = 1.0, time_status_step: timedelta = None, time_status_init: datetime = None) None
Command to start a test run execution by starting the simulator execution with all necessary parameters and publishing a start command, which can be received by the connected applications.
- Parameters:
sim_start_time (
datetime) – Scenario time at which to start executionsim_stop_time (
datetime) – Scenario time at which to stop executionstart_time (
datetime) – Wallclock time at which to start execution (default: now)time_step (
timedelta) – Scenario time step used in execution (default: 1 second)time_scale_factor (float) – Scenario seconds per wallclock second (default: 1.0)
time_status_step (
timedelta) – Scenario duration between time status messagestime_status_init (
datetime) – Scenario time of first time status message
- start_up(prefix: str, config: ConnectionConfig, set_offset: bool = True, time_status_step: timedelta = None, time_status_init: datetime = None, shut_down_when_terminated: bool = False, access_token: str = None, refresh_token: str = None) None
Starts up the application by connecting to message broker, starting a background event loop, subscribing to manager events, and registering callback functions.
- Parameters:
prefix (str) – execution namespace (prefix)
config (
ConnectionConfig) – connection configurationset_offset (bool) – True, if the system clock offset shall be set using a NTP request prior to execution
time_status_step (
timedelta) – scenario duration between time status messagestime_status_init (
datetime) – scenario time for first time status messageshut_down_when_terminated (bool) – True, if the application should shut down when the simulation is terminated
access_token (str) – pre-acquired Keycloak access token (optional; requires refresh_token)
refresh_token (str) – pre-acquired Keycloak refresh token (required when access_token is provided)
- stop(sim_stop_time: datetime) None
Command to stop a test run execution by updating the execution end time and publishing a stop command.
- Parameters:
sim_stop_time (
datetime) – Scenario time at which to stop execution.
- class LoggerApplication(app_name: str = 'logger', app_description: str = None)
Bases:
ApplicationLogger NOS-T Application.
This object class defines the basic functionality for a NOS-T application that subscribes to a specified topic and logs all messages to file.
- simulator
Application simulator defined in the Simulator class
- Type:
Simulator
- log_file
Current log file
- Type:
File
- __init__(app_name: str = 'logger', app_description: str = None)
Initializes a new logging application.
- on_log_message(ch, method, properties, body)
Callback function to log a message received by the logger application.
- Parameters:
ch – The channel object
method – The method frame
properties – The message properties
body – The message body
- shut_down() None
Shuts down the application by stopping the background event loop and disconnecting from the message broker.
- start_up(prefix: str, config: ConnectionConfig, set_offset: bool = None, time_status_step: timedelta = None, time_status_init: datetime = None, shut_down_when_terminated: bool = None, time_step: timedelta = None, manager_app_name: str = None, log_app: str = '+', log_topic: str = '#', log_dir: str = '.') None
Starts up the logger application by connecting to message broker, starting a background event loop, subscribing to manager events, and registering callback functions.
- Parameters:
prefix (str) – The test run namespace (prefix)
config (
ConnectionConfig) – The connection configurationset_offset (bool) – True, if the system clock offset shall be set
time_status_step (
timedelta) – Time interval for status messagestime_status_init (
datetime) – Initial time for status messagesshut_down_when_terminated (bool) – True, if the application shall shut down when terminated
time_step (
timedelta) – Time step for the applicationmanager_app_name (str) – Manager application name
log_app (str) – Application name to be logged (default: “+”)
log_topic (str) – Topic to be logged (default: “#”)
log_dir (str) – Directory to write log files (default: “.”)