Application Objects

These object classes manage communication between a simulator and broker using the MQTT messaging protocol.

Utilities

Utilities monitor and report on application connections, modes, and time statuses.

class ConnectionConfig(host: str, port: int, cert: str, key: str)

Bases: object

Connection configuration.

The configuration settings to establish a connection to the broker, including authentication for the user and identification of the server.

host

broker hostname

Type:

str

port

broker port number

Type:

int

cert

filepath to client certificate

Type:

str

key

filepath to client private key

Type:

str

__init__(host: str, port: int, cert: str, key: str)

Initializes a new connection configuration.

Parameters:
  • host (str) – broker hostname

  • port (int) – broker port number

  • cert (str) – filepath to client certificate

  • key (str) – filepath to client private key

class ShutDownObserver(app: Application)

Bases: Observer

Observer that shuts down an application after scenario termination.

app

application to be shut down after termination

Type:

Application

__init__(app: Application)

Initializes a new shut down observer.

Parameters:

app (Application) – application to be shut down after termination

on_change(source: object, property_name: str, old_value: object, new_value: object) None

Shuts down the application in response to a transition to the TERMINATED mode.

Parameters:
  • source (object) – observable that triggered the change

  • property_name (str) – name of the changed property

  • old_value (obj) – old value of the named property

  • new_value (obj) – new value of the named property

class TimeStatusPublisher(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)

Bases: ScenarioTimeIntervalPublisher

Publishes time status messages for an application at a regular interval.

app

application to publish time status messages

Type:

Application

publish_message() None

Publishes a time status message.

class ModeStatusObserver(app: Application)

Bases: Observer

Observer that publishes mode status messages for an application.

app

application to publish mode status messages

Type:

Application

__init__(app: Application)

Initializes a new mode status observer.

on_change(source: object, property_name: str, old_value: object, new_value: object) None

Publishes a mode status message in response ot a mode transition.

Parameters:
  • source (object) – observable that triggered the change

  • property_name (str) – name of the changed property

  • old_value (obj) – old value of the named property

  • new_value (obj) – new value of the named property


Publishers

Publishers define patterns for publishing messages on regularly spaced time-intervals, which are useful for updating states or simple time-status messages.

class ScenarioTimeIntervalPublisher(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)

Bases: Observer, ABC

Publishes messages at a regular interval (scenario time).

Provides the simulation with time status messages, also refered to as ‘heartbeat messages’, or ‘simulation time statuses’.

app

application to publish messages

Type:

Application

time_status_step

scenario duration between time status messages

Type:

timedelta

time_status_init

scenario time for first time status message

Type:

datetime

__init__(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)

Initializes a new scenario time interval publisher.

Parameters:
  • app (Application) – application to publish messages

  • time_status_step (timedelta) – scenario duration between time status messages

  • time_status_init (datetime) – scenario time for first time status message

on_change(source: object, property_name: str, old_value: object, new_value: object) None

Publishes a message after a designated interval of scenario time.

Parameters:
  • source (object) – observable that triggered the change

  • property_name (str) – name of the changed property

  • old_value (obj) – old value of the named property

  • new_value (obj) – new value of the named property

abstract publish_message() None

Publishes a message.

class WallclockTimeIntervalPublisher(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)

Bases: Observer, ABC

Publishes messages at a regular interval (wallclock time).

app

application to publish messages

Type:

Application

time_status_step

wallclock duration between time status messages

Type:

timedelta

time_status_init

wallclock time for first time status message

Type:

datetime

__init__(app: Application, time_status_step: timedelta = None, time_status_init: datetime = None)

Initializes a new wallclock time interval publisher.

Parameters:
  • app (Application) – application to publish messages

  • time_status_step (timedelta) – wallclock duration between time status messages

  • time_status_init (datetime) – wallclock time for first time status message

on_change(source, property_name, old_value, new_value)

Publishes a message after a designated interval of scenario time.

Parameters:
  • source (object) – observable that triggered the change

  • property_name (str) – name of the changed property

  • old_value (obj) – old value of the named property

  • new_value (obj) – new value of the named property

abstract publish_message() None

Publishes a message.


Applications

These applications serve as templates or wrappers of basic MQTT client functionality and synchronization for simulation.

class Application(app_name: str, app_description: str | None = None)

Bases: object

Base class for a member application.

This object class defines the main functionality of a NOS-T application which can be modified for user needs.

prefix

The test run namespace (prefix)

Type:

str

simulator

Application simulator – calls on the simulator.py class for functionality

Type:

Simulator

client

Application MQTT client

Type:

Client

app_name

Test run application name

Type:

str

app_description

Test run application description (optional)

Type:

str

time_status_step

Scenario duration between time status messages

Type:

timedelta

time_status_init

Scenario time of first time status message

Type:

datetime

__init__(app_name: str, app_description: str | None = None)

Initializes a new application.

Parameters:
  • app_name (str) – application name

  • app_description (str) – application description (optional)

add_message_callback(app_name: str, app_topic: str, callback: Callable) None

Adds a message callback bound to an application name and topic prefix/app_name/app_topic.

Parameters:
  • app_name (str) – application name

  • app_topic (str) – application topic

  • callback (Callable) – callback function

ready() None

Signals the application is ready to initialize scenario execution. Publishes a ReadyStatus message to the topic prefix/app_name/status/ready.

remove_message_callback(app_name: str, app_topic: str) None

Removes a message callback for application name and topic prefix/app_name/app_topic.

Parameters:
  • app_name (str) – The application name

  • app_topic (str) – The application topic

send_message(app_topic: str, payload: str) None

Sends a message with payload payload to the topic prefix/app_name/app_topic.

Parameters:
  • app_topic (str) – application topic

  • payload (str) – message payload (JSON-encoded string)

set_wallclock_offset(host='pool.ntp.org', retry_delay_s: int = 5, max_retry: int = 5) None

Issues a Network Time Protocol (NTP) request to determine the system clock offset.

Parameters:
  • host (str) – NTP host (default: ‘pool.ntp.org’)

  • retry_delay_s (int) – number of seconds to wait before retrying

  • max_retry (int) – maximum number of retries allowed

shut_down() None

Shuts down the application by stopping the background event loop and disconnecting from the broker.

start_up(prefix: str, config: ConnectionConfig, set_offset: bool = True, time_status_step: timedelta | None = None, time_status_init: datetime | None = None, shut_down_when_terminated: bool = False) None

Starts up the application to prepare for scenario execution. Connects to the message broker and starts a background event loop by establishing the simulation prefix, the connection configuration, and the intervals for publishing time status messages.

Parameters:
  • prefix (str) – messaging namespace (prefix)

  • config (ConnectionConfig) – connection configuration

  • set_offset (bool) – True, if the system clock offset shall be set using a NTP request prior to execution

  • time_status_step (timedelta) – scenario duration between time status messages

  • time_status_init (datetime) – scenario time for first time status message

  • shut_down_when_terminated (bool) – True, if the application should shut down when the simulation is terminated

class ManagedApplication(app_name: str, app_description: str | None = None)

Bases: Application

Managed NOS-T Application.

This object class defines the basic functionality for a NOS-T application that utilizes an external Manager to command simulator execution.

prefix

execution namespace (prefix)

Type:

str

simulator

simulator

Type:

Simulator

client

MQTT client

Type:

Client

app_name

application name

Type:

str

app_description

application description

Type:

str

time_status_step

scenario duration between time status messages

Type:

timedelta

time_status_init

scenario time of first time status message

Type:

datetime

time_step

scenario time step used in execution

Type:

timedelta

__init__(app_name: str, app_description: str | None = None)

Initializes a new managed application.

Parameters:
  • app_name (str) – application name

  • app_description (str) – application description

on_manager_init(client: Client, userdata: object, message: MQTTMessage) None

Callback function for the managed application to respond to an initilize command sent from the manager. Parses the scenario start/end times and signals ready.

Parameters:
  • client (paho.mqtt.client.Client) – client instance for this callback

  • userdata (object) – private user data as set in the client

  • message (paho.mqtt.client.MQTTMessage) – MQTT message

on_manager_start(client: Client, userdata: object, message: MQTTMessage) None

Callback function for the managed application to respond to a start command sent from the manager. Parses the scenario start/end time, wallclock epoch, and time scale factor and executes the simulator in a background thread.

Parameters:
  • client (paho.mqtt.client.Client) – client instance for this callback

  • userdata (object) – private user data as set in the client

  • message (paho.mqtt.client.MQTTMessage) – MQTT message

on_manager_stop(client: Client, userdata: object, message: MQTTMessage) None

Callback function for the managed application (‘self’) to respond to a stop command sent from the manager. Parses the end time and updates the simulator.

Parameters:
  • client (paho.mqtt.client.Client) – client instance for this callback

  • userdata (object) – private user data as set in the client

  • message (paho.mqtt.client.MQTTMessage) – MQTT message

on_manager_update(client: Client, userdata: object, message: MQTTMessage) None

Callback function for the managed application (‘self’) to respond to an update command sent from the manager. Parses the time scaling factor and scenario update time and updates the simulator.

Parameters:
  • client (paho.mqtt.client.Client) – client instance for this callback

  • userdata (object) – private user data as set in the client

  • message (paho.mqtt.client.MQTTMessage) – MQTT message

shut_down() None

Shuts down the application by stopping the background event loop and disconnecting the application from the broker.

start_up(prefix: str, config: ConnectionConfig, set_offset: bool = True, time_status_step: timedelta | None = None, time_status_init: datetime | None = None, shut_down_when_terminated: bool = False, time_step: timedelta = datetime.timedelta(seconds=1), manager_app_name: str = 'manager') None

Starts up the application by connecting to message broker, starting a background event loop, subscribing to manager events, and registering callback functions.

Parameters:
  • prefix (str) – execution namespace (prefix)

  • config (ConnectionConfig) – connection configuration

  • set_offset (bool) – True, if the system clock offset shall be set using a NTP request prior to execution

  • time_status_step (timedelta) – scenario duration between time status messages

  • time_status_init (datetime) – scenario time for first time status message

  • shut_down_when_terminated (bool) – True, if the application should shut down when the simulation is terminated

  • time_step (timedelta) – scenario time step used in execution (Default: 1 second)

  • manager_app_name (str) – manager application name (Default: manager)

class TimeScaleUpdate(time_scale_factor: float, sim_update_time: datetime)

Bases: object

Provides a scheduled update to the simulation time scale factor by sending a message at the designated sim_update_time to change the time_scale_factor to the indicated value.

time_scale_factor

scenario seconds per wallclock second

Type:

float

sim_update_time

scenario time that the update will occur

Type:

datetime

__init__(time_scale_factor: float, sim_update_time: datetime)

Instantiates a new time scale update.

Parameters:
  • time_scale_factor (float) – scenario seconds per wallclock second

  • sim_update_time (datetime) – scenario time that the update will occur

class Manager

Bases: Application

NOS-T Manager Application.

This object class defines a manager to orchestrate test run executions.

prefix

The test run namespace (prefix)

Type:

str

simulator

Application simulator

Type:

Simulator

client

Application MQTT client

Type:

Client

time_step

Scenario time step used in execution

Type:

timedelta

time_status_step

Scenario duration between time status messages

Type:

timedelta

time_status_init

Scenario time of first time status message

Type:

datetime

app_name

Test run application name

Type:

str

app_description

Test run application description (optional)

Type:

str

required_apps_status

Ready status for all required applications

Type:

dict

__init__()

Initializes a new manager.

execute_test_plan(sim_start_time: datetime, sim_stop_time: datetime, start_time: datetime | None = None, time_step: timedelta = datetime.timedelta(seconds=1), time_scale_factor: float = 1.0, time_scale_updates: List[TimeScaleUpdate] = [], time_status_step: timedelta | None = None, time_status_init: datetime | None = None, command_lead: timedelta = datetime.timedelta(0), required_apps: List[str] = [], init_retry_delay_s: int = 5, init_max_retry: int = 5) None

A comprehensive command to start a test run execution.

Publishes an initialize, start, zero or more updates, and a stop message in one condensed JSON script for testing purposes, or consistent test-case runs.

Parameters:
  • sim_start_time (datetime) – scenario time at which to start execution

  • sim_stop_time (datetime) – scenario time at which to stop execution

  • start_time (datetime) – wallclock time at which to start execution (default: now)

  • time_step (timedelta) – scenario time step used in execution (default: 1 second)

  • time_scale_factor (float) – scenario seconds per wallclock second (default: 1.0)

  • time_scale_updates (list(TimeScaleUpdate)) – list of scheduled time scale updates (default: [])

  • time_status_step (timedelta) – scenario duration between time status messages

  • time_status_init (datetime) – scenario time of first time status message

  • command_lead (timedelta) – wallclock lead time between command and action (default: 0 seconds)

  • required_apps (list(str)) – list of application names required to continue with the execution

  • init_retry_delay_s (float) – number of seconds to wait between initialization commands while waiting for required applications

  • init_max_retry (int) – number of initialization commands while waiting for required applications before continuing to execution

init(sim_start_time: datetime, sim_stop_time: datetime, required_apps: List[str] = []) None

Publishes an initialize command to initialize a test run execution.

Parameters:
  • sim_start_time (datetime) – Earliest possible scenario start time

  • sim_stop_time (datetime) – Latest possible scenario end time

  • required_apps (list(str)) – List of required apps

on_app_ready_status(client: Client, userdata: object, message: MQTTMessage) None

Callback to handle a message containing an application ready status.

on_app_time_status(client: Client, userdata: object, message: MQTTMessage) None

Callback to handle a message containing an application time status.

start(sim_start_time: datetime, sim_stop_time: datetime, start_time: datetime | None = None, time_step: timedelta = datetime.timedelta(seconds=1), time_scale_factor: float = 1.0, time_status_step: timedelta | None = None, time_status_init: datetime | None = None) None

Command to start a test run execution by starting the simulator execution with all necessary parameters and publishing a start command, which can be received by the connected applications.

Parameters:
  • sim_start_time (datetime) – Scenario time at which to start execution

  • sim_stop_time (datetime) – Scenario time at which to stop execution

  • start_time (datetime) – Wallclock time at which to start execution (default: now)

  • time_step (timedelta) – Scenario time step used in execution (default: 1 second)

  • time_scale_factor (float) – Scenario seconds per wallclock second (default: 1.0)

  • time_status_step (timedelta) – Scenario duration between time status messages

  • time_status_init (datetime) – Scenario time of first time status message

stop(sim_stop_time: datetime) None

Command to stop a test run execution by updating the execution end time and publishing a stop command.

Parameters:

sim_stop_time (datetime) – Scenario time at which to stop execution.

update(time_scale_factor: float, sim_update_time: datetime) None

Command to update the time scaling factor for a test run execution by updating the execution time scale factor, and publishing an update command.

Parameters:
  • time_scale_factor (float) – scenario seconds per wallclock second

  • sim_update_time (datetime) – scenario time at which to update

class LoggerApplication(app_name: str = 'logger', app_description: str | None = None)

Bases: Application

Logger NOS-T Application.

This object class defines the basic functionality for a NOS-T application that subscribes to a specified topic and logs all messages to file.

prefix

The test run namespace (prefix)

Type:

str

simulator

Application simulator defined in the Simulator class

Type:

Simulator

client

Application MQTT client

Type:

Client

app_name

Logger application name (default: logger)

Type:

str

app_description

Logger application description (optional)

Type:

str

log_app

Application name to be logged (default: “+”)

Type:

str

log_topic

Topic to be logged (default: “#”)

Type:

str

log_dir

Directory to write log files (default: “.”)

Type:

str

log_file

Current log file

Type:

File

__init__(app_name: str = 'logger', app_description: str | None = None)

Initializes a new logging application.

Parameters:
  • app_name (str) – application name (default: “logger”)

  • app_description (str) – application description (optional)

on_log_connect(client: Client, userdata: object, flags: dict, rc: int) None

Callback function that opens the log file when the MQTT client connects to the broker.

Parameters:
  • client (paho.mqtt.client.Client) – client instance for this callback

  • userdata (object) – private user data as set in the client

  • flags (dict) – response flags sent by the broker

  • rc (int) – connection result

on_log_disconnect(client: Client, userdata: object, rc: int) None

Callback function that closes the log file when the MQTT client disconnects from the broker.

Parameters:
  • client (paho.mqtt.client.Client) – The client instance for this callback

  • userdata (object) – The private user data as set in the client

  • rc (int) – The connection result

on_log_message(client: Client, userdata: object, message: MQTTMessage)

Callback function to log a message received by the logger application.

Parameters:
  • client (paho.mqtt.client.Client) – client instance for this callback

  • userdata (object) – private user data as set in the client

  • message (paho.mqtt.client.MQTTMessage) – MQTT message

shut_down() None

Shuts down the application by stopping the background event loop and disconnecting from the message broker.

start_up(prefix: str, config: ConnectionConfig, set_offset: bool = True, log_app: str = '+', log_topic: str = '#', log_dir: str = '.') None

Starts up the logger application by connecting to message broker, starting a background event loop, subscribing to manager events, and registering callback functions.

Parameters:
  • prefix (str) – The test run namespace (prefix)

  • config (ConnectionConfig) – The connection configuration

  • set_offset (bool) – True, if the system clock offset shall be set