Delay

Unmanaged application that monitors heartbeat message time delays

Application listens for status/time topics of specified applications and records reported wallclock time for comparison against own wallclock time when message subscription received from the broker.

query_nist(host='pool.ntp.org', retry_delay_s=5, max_retry=5)

Contacts an NTP server to determine the system clock offset, which is necessary for synchronizing and accurately recording time_status message delays.

Parameters:
  • host (str) – the NTP host (default: ‘pool.ntp.org’).

  • retry_delay_s (int) – the number of seconds to wait before retrying.

  • max_retry (int) – the maximum number of retries allowed.

Returns:

offset

time duration measuring delay between system wallclock and NTP server reference

Return type:

timedelta

post_processing(names, msg_dictionaries, msg_periodicity=None, msg_size=None, wallclock_offset=None)

At end of simulation, calculates time difference between reported wallclock publish datetime and system wallclock receipt datetime (with wallclock_offset from NTP server accounted for) in microseconds for each time status message. Converts final list of these measured time delays into a DataFrame and calculates summary statistics for mean, median, mode, standard deviation, and skew of time delay distributions.

Parameters:
  • names (list) – list of names (str) of applications that were monitored for delays

  • msg_dictionaries (list) – list of dictionaries with common keys representing each messages time delay information

  • msg_periodicity (float) – duration between successive time status messages (typically reported by application at beginning of simulation)

  • msg_size (int) – number of characters in the message where one char equals one byte (typically reported by application at beginning of simulation)

  • wallclock_offset (timedelta) – recorded system offset as determined from request to NTP server

Returns:

app_df_dict

dict with key:value pair defined by application names (str): DataFrame of message time delays for corresponding application

stats_df_dict

dict with key:value pair defined by application names (str): summary statistics for delay messages for corresponding application

Return type:

dict, dict

class HeartbeatDelayRecorder(names)

Records delays in time_status messages between published timestamp and system clock receipt time for subscribed message from event broker

Parameters:

names (list) – list of names (str) for each application to be monitored

msg_dictionaries

initialized dictionary with keys corresponding to names and empty lists for values

Type:

dict

sim_end_time

datetime corresponding to end of simulation (initialized to None), to be updated on receipt of stop message from manager

Type:

datetime

_wallclock_offset

query_nist method called when class initialized, returning timedelta between system wallclock and NTP server reference

Type:

timedelta

msg_periodicity

duration between successive time status messages (initialized to None), to be updated on receipt of heartbeat settings message

Type:

float

msg_size

number of characters in the message where one char equals one byte (initialized to None), to be updated on receipt of heartbeat settings message

Type:

int

This unmanged application monitors time status updates for any managed application specified by the user. The application records the difference between wallclock time when message published (included in message payload) and wallclock time when message received (checked against local clock with offset from initial NIST query). The following code demonstrates how this unmanaged application connects to the message broker, set up subscriptions to different topics, and add callback functions without using the NOS-T tools library.

            print("Heartbeat application settings detected")
            self.msg_periodicity = float(data["periodicity"])
            self.msg_size = float(data["messageBytes"])

        for i, application in enumerate(self.names):
            if topic == f"{PREFIX}.status.{application}.time":
                msgId = len(self.msg_dictionaries[application])
                timeStamp = pd.to_datetime(data["properties"]["time"])
                if self.sim_end_time != None:
                    scenarioTime = pd.to_datetime(data["properties"]["simTime"])
                    if scenarioTime >= self.sim_end_time:
                        if self.channel:
                            self.channel.stop_consuming()
                        print(
                            "Simulation has passed the specified end time. Beginning post-processing:"
                        )
                        self.app_df_dict, self.stats_df_dict = post_processing(
                            self.names,
                            self.msg_dictionaries,
                            self.msg_periodicity,
                            self.msg_size,
                            self._wallclock_offset,
                        )

                        if not os.path.exists("delay_data_collection"):
                            os.makedirs("delay_data_collection")

                        self.app_df_dict["heartbeat"].to_csv(
                            f"delay_data_collection/logs_msgFreq_{self.msg_periodicity}_msgSize_{self.msg_size}.csv"
                        )
                        if exists("delay_data_collection/doeStats.csv"):
                            self.stats_df_dict["heartbeat"].to_csv(
                                "delay_data_collection/doeStats.csv",
                                mode="a",
                                index=False,
                                header=False,
                            )
                        else:
                            self.stats_df_dict["heartbeat"].to_csv(
                                "delay_data_collection/doeStats.csv",
                                mode="a",
                                index=False,
                                header=True,
                            )

                        print("Post-processing complete, exiting application.")
                        sys.exit()
                msgSize = len(msg_payload)
                self.msg_dictionaries[application].append(
                    {
                        "msgId": msgId,
                        "timeStamp": timeStamp,
                        "sysTimeNST": self.get_wallclock_time(),
                        "msgSize": msgSize,
                    }
                )


if __name__ == "__main__":
    # initialize the HeartbeatDelayRecorder with list of application names (APPS) from delay_config_files/config.py
    hbdr = HeartbeatDelayRecorder(APPS)

    # Note that these are loaded from a .env file in current working directory
    credentials = dotenv_values(".env")
    HOST = credentials["HOST"]
    PORT = credentials["PORT"]
    USERNAME = credentials["USERNAME"]
    PASSWORD = credentials["PASSWORD"]

    # Set up RabbitMQ connection
    credentials = pika.PlainCredentials(USERNAME, PASSWORD)
    parameters = pika.ConnectionParameters(
        host=HOST,
        credentials=credentials,
        port=PORT,
    )

    connection = pika.BlockingConnection(parameters)
    channel = hbdr.channel = connection.channel()

    # Declare exchange - assuming a topic exchange
    exchange_name = PREFIX
    channel.exchange_declare(
        exchange=exchange_name, exchange_type="topic", durable=True, auto_delete=True
    )

    # Create a queue for this consumer
    result = channel.queue_declare("", exclusive=True)
    queue_name = result.method.queue

    # Binding patterns
    routing_keys = [
        f"{PREFIX}.manager.start",
        f"{PREFIX}.manager.stop",
        f"{PREFIX}.heartbeat.settings",
    ]

    # Add application specific bindings
    for application in hbdr.names:
        routing_keys.append(f"{PREFIX}.status.{application}.time")

    # Bind queue to all routing keys
    for routing_key in routing_keys:
        channel.queue_bind(
            exchange=exchange_name, queue=queue_name, routing_key=routing_key
        )

    # Set up consumer
    channel.basic_consume(
        queue=queue_name, on_message_callback=hbdr.on_message, auto_ack=True
    )

    print("Waiting for messages. To exit press CTRL+C")
    channel.start_consuming()