Delay
Unmanaged application that monitors heartbeat message time delays
Application listens for status/time topics of specified applications and records reported wallclock time for comparison against own wallclock time when message subscription received from the broker.
- query_nist(host='pool.ntp.org', retry_delay_s=5, max_retry=5)
Contacts an NTP server to determine the system clock offset, which is necessary for synchronizing and accurately recording time_status message delays.
- Parameters:
- Returns:
- offset
time duration measuring delay between system wallclock and NTP server reference
- Return type:
timedelta
- post_processing(names, msg_dictionaries, msg_periodicity=None, msg_size=None, wallclock_offset=None)
At end of simulation, calculates time difference between reported wallclock publish
datetimeand system wallclock receiptdatetime(with wallclock_offset from NTP server accounted for) in microseconds for each time status message. Converts finallistof these measured time delays into aDataFrameand calculates summary statistics for mean, median, mode, standard deviation, and skew of time delay distributions.- Parameters:
names (list) – list of names (str) of applications that were monitored for delays
msg_dictionaries (list) – list of dictionaries with common keys representing each messages time delay information
msg_periodicity (float) – duration between successive time status messages (typically reported by application at beginning of simulation)
msg_size (int) – number of characters in the message where one char equals one byte (typically reported by application at beginning of simulation)
wallclock_offset (
timedelta) – recorded system offset as determined from request to NTP server
- Returns:
- Return type:
- class HeartbeatDelayRecorder(names)
Records delays in time_status messages between published timestamp and system clock receipt time for subscribed message from event broker
- Parameters:
names (list) – list of names (str) for each application to be monitored
- msg_dictionaries
initialized dictionary with keys corresponding to
namesand empty lists for values- Type:
- sim_end_time
datetimecorresponding to end of simulation (initialized to None), to be updated on receipt of stop message from manager- Type:
- _wallclock_offset
query_nistmethod called when class initialized, returningtimedeltabetween system wallclock and NTP server reference- Type:
timedelta
- msg_periodicity
duration between successive time status messages (initialized to None), to be updated on receipt of heartbeat settings message
- Type:
This unmanged application monitors time status updates for any managed application specified by the user. The application records the difference between wallclock time when message published (included in message payload) and wallclock time when message received (checked against local clock with offset from initial NIST query). The following code demonstrates how this unmanaged application connects to the message broker, set up subscriptions to different topics, and add callback functions without using the NOS-T tools library.
# Note that these are loaded from a .env file in current working directory
credentials = dotenv_values(".env")
HOST, PORT = credentials["HOST"], int(credentials["PORT"])
USERNAME, PASSWORD = credentials["USERNAME"], credentials["PASSWORD"]
# build the MQTT client
CLIENT = mqtt.Client()
# set client username and password
CLIENT.username_pw_set(username=USERNAME, password=PASSWORD)
# set tls certificate
CLIENT.tls_set()
# connect to MQTT server on port 8883
CLIENT.connect(HOST, PORT)
# subscribe to the correct topics
subList = [(f"{PREFIX}/manager/start",0), (f"{PREFIX}/manager/stop",0), (f"{PREFIX}/heartbeat/settings",0)]
for i, application in enumerate(hbdr.names):
subList.append((f"{PREFIX}/status/{application}/time",0))
CLIENT.subscribe(subList)
# add on_message as a callback function
CLIENT.on_message = hbdr.on_message
CLIENT.loop_forever()