Delay
Unmanaged application that monitors heartbeat message time delays
Application listens for status/time topics of specified applications and records reported wallclock time for comparison against own wallclock time when message subscription received from the broker.
- query_nist(host='pool.ntp.org', retry_delay_s=5, max_retry=5)
Contacts an NTP server to determine the system clock offset, which is necessary for synchronizing and accurately recording time_status message delays.
- Parameters:
- Returns:
- offset
time duration measuring delay between system wallclock and NTP server reference
- Return type:
timedelta
- post_processing(names, msg_dictionaries, msg_periodicity=None, msg_size=None, wallclock_offset=None)
At end of simulation, calculates time difference between reported wallclock publish
datetimeand system wallclock receiptdatetime(with wallclock_offset from NTP server accounted for) in microseconds for each time status message. Converts finallistof these measured time delays into aDataFrameand calculates summary statistics for mean, median, mode, standard deviation, and skew of time delay distributions.- Parameters:
names (list) – list of names (str) of applications that were monitored for delays
msg_dictionaries (list) – list of dictionaries with common keys representing each messages time delay information
msg_periodicity (float) – duration between successive time status messages (typically reported by application at beginning of simulation)
msg_size (int) – number of characters in the message where one char equals one byte (typically reported by application at beginning of simulation)
wallclock_offset (
timedelta) – recorded system offset as determined from request to NTP server
- Returns:
- Return type:
- class HeartbeatDelayRecorder(names)
Records delays in time_status messages between published timestamp and system clock receipt time for subscribed message from event broker
- Parameters:
names (list) – list of names (str) for each application to be monitored
- msg_dictionaries
initialized dictionary with keys corresponding to
namesand empty lists for values- Type:
- sim_end_time
datetimecorresponding to end of simulation (initialized to None), to be updated on receipt of stop message from manager- Type:
- _wallclock_offset
query_nistmethod called when class initialized, returningtimedeltabetween system wallclock and NTP server reference- Type:
timedelta
- msg_periodicity
duration between successive time status messages (initialized to None), to be updated on receipt of heartbeat settings message
- Type:
This unmanged application monitors time status updates for any managed application specified by the user. The application records the difference between wallclock time when message published (included in message payload) and wallclock time when message received (checked against local clock with offset from initial NIST query). The following code demonstrates how this unmanaged application connects to the message broker, set up subscriptions to different topics, and add callback functions without using the NOS-T tools library.
print("Heartbeat application settings detected")
self.msg_periodicity = float(data["periodicity"])
self.msg_size = float(data["messageBytes"])
for i, application in enumerate(self.names):
if topic == f"{PREFIX}.status.{application}.time":
msgId = len(self.msg_dictionaries[application])
timeStamp = pd.to_datetime(data["properties"]["time"])
if self.sim_end_time != None:
scenarioTime = pd.to_datetime(data["properties"]["simTime"])
if scenarioTime >= self.sim_end_time:
if self.channel:
self.channel.stop_consuming()
print(
"Simulation has passed the specified end time. Beginning post-processing:"
)
self.app_df_dict, self.stats_df_dict = post_processing(
self.names,
self.msg_dictionaries,
self.msg_periodicity,
self.msg_size,
self._wallclock_offset,
)
if not os.path.exists("delay_data_collection"):
os.makedirs("delay_data_collection")
self.app_df_dict["heartbeat"].to_csv(
f"delay_data_collection/logs_msgFreq_{self.msg_periodicity}_msgSize_{self.msg_size}.csv"
)
if exists("delay_data_collection/doeStats.csv"):
self.stats_df_dict["heartbeat"].to_csv(
"delay_data_collection/doeStats.csv",
mode="a",
index=False,
header=False,
)
else:
self.stats_df_dict["heartbeat"].to_csv(
"delay_data_collection/doeStats.csv",
mode="a",
index=False,
header=True,
)
print("Post-processing complete, exiting application.")
sys.exit()
msgSize = len(msg_payload)
self.msg_dictionaries[application].append(
{
"msgId": msgId,
"timeStamp": timeStamp,
"sysTimeNST": self.get_wallclock_time(),
"msgSize": msgSize,
}
)
if __name__ == "__main__":
# initialize the HeartbeatDelayRecorder with list of application names (APPS) from delay_config_files/config.py
hbdr = HeartbeatDelayRecorder(APPS)
# Note that these are loaded from a .env file in current working directory
credentials = dotenv_values(".env")
HOST = credentials["HOST"]
PORT = credentials["PORT"]
USERNAME = credentials["USERNAME"]
PASSWORD = credentials["PASSWORD"]
# Set up RabbitMQ connection
credentials = pika.PlainCredentials(USERNAME, PASSWORD)
parameters = pika.ConnectionParameters(
host=HOST,
credentials=credentials,
port=PORT,
)
connection = pika.BlockingConnection(parameters)
channel = hbdr.channel = connection.channel()
# Declare exchange - assuming a topic exchange
exchange_name = PREFIX
channel.exchange_declare(
exchange=exchange_name, exchange_type="topic", durable=True, auto_delete=True
)
# Create a queue for this consumer
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
# Binding patterns
routing_keys = [
f"{PREFIX}.manager.start",
f"{PREFIX}.manager.stop",
f"{PREFIX}.heartbeat.settings",
]
# Add application specific bindings
for application in hbdr.names:
routing_keys.append(f"{PREFIX}.status.{application}.time")
# Bind queue to all routing keys
for routing_key in routing_keys:
channel.queue_bind(
exchange=exchange_name, queue=queue_name, routing_key=routing_key
)
# Set up consumer
channel.basic_consume(
queue=queue_name, on_message_callback=hbdr.on_message, auto_ack=True
)
print("Waiting for messages. To exit press CTRL+C")
channel.start_consuming()