Science Event Publisher
Unmanaged application that publishes the utility values for random “science events”.
This application publishes random “science events” for testing other NOS-T applications. The message payload contains the current time, a random location, and the science utility function value at that time step.
The following code is used to set user credentials from a .env file and connect to the broker.
# Note that these are loaded from a .env file in current working directory
credentials = dotenv_values(".env")
HOST, PORT = credentials["HOST"], int(credentials["PORT"])
USERNAME, PASSWORD = credentials["USERNAME"], credentials["PASSWORD"]
# Connection parameters for RabbitMQ
pika_credentials = pika.PlainCredentials(USERNAME, PASSWORD)
parameters = pika.ConnectionParameters(HOST, PORT, "/", pika_credentials)
# Establish connection to RabbitMQ
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# Declare an exchange
exchange_name = "science_event"
channel.exchange_declare(exchange=exchange_name, exchange_type="topic")
Next, a loop is created which randomly triggers science events. When triggered, these science events occur at a random location with a deterministic utility function. The code then publishes the utility score for each time step until it reaches zero.
try:
while True:
# event trigger
eventRand = random.randint(100)
if eventRand <= 99: # the percent chance an event will occur
# event location
eventLat = random.uniform(-180, 180)
eventLon = random.uniform(-90, 90)
# loop for utility function
for i in range(10):
# science utility function
utility = -((i / 10) ** 2) + 1
currentTime = datetime.now()
currentTime = currentTime.strftime("%H:%M:%S")
# publish event utility message
eventMessage = {
"time": currentTime,
"latitude": eventLat,
"longitude": eventLon,
"utility": utility,
}
routing_key = "BCtest.AIAA.eventUtility"
message_body = json.dumps(eventMessage)
channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=message_body,
)
print(eventMessage)
# wait for next utility step
time.sleep(1)
# time step between possible events
next_step = datetime.now() + timedelta(seconds=5)
# allows application to be stopped with GUI every second
while datetime.now() < next_step:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping publisher...")
finally:
connection.close()
print("Connection closed")