Creating a NOS-T Publisher-Consumer Example
This example demonstrates how to create a simple publisher-consumer system using the NOS-T library. The example showcases how messages flow through the broker from publishers to consumers using NOS-T.
Prerequisites
NOS-T installed (as described in Installation).
RabbitMQ broker running (as described in RabbitMQ Broker on Local Host).
Basic understanding of NOS-T messaging concepts.
Configuration File
First, create a file named sos.yaml with the following content:
info:
title: Novel Observing Strategies Testbed (NOS-T) YAML Configuration
version: '1.0.0'
description: Version-controlled AsyncAPI document for RabbitMQ event broker with Keycloak authentication within NOS-T
servers:
rabbitmq:
keycloak_authentication: False
host: "localhost"
port: 5672
tls: False
virtual_host: "/"
message_expiration: "60000" # in milliseconds, message expiration time
delivery_mode: 2 # 1=transient, 2=durable
content_type: "text/plain"
heartbeat: 30 # in seconds
connection_attempts: 3
retry_delay: 5 # in seconds
keycloak:
host: "nost.smce.nasa.gov"
port: 8443
tls: True
token_refresh_interval: 10 #in seconds
realm: "NOS-T"
execution:
general:
prefix: sos
manager:
sim_start_time: "2019-03-01T23:59:59+00:00"
sim_stop_time: "2019-03-10T23:59:59+00:00"
start_time:
time_step: "0:00:01"
time_scale_factor: 288 # 1 simulation day = 5 wallclock minutes
time_scale_updates: []
time_status_step: "0:00:01" # 1 second * time scale factor
time_status_init: "2019-03-01T23:59:59+00:00"
command_lead: "0:00:05"
required_apps:
- manager
- planner
- appender
- simulator
init_retry_delay_s: 5
init_max_retry: 5
set_offset: True
shut_down_when_terminated: False
managed_application:
time_scale_factor: 288 # 1 simulation day = 5 wallclock minutes
time_step: "0:00:01" # 1 second * time scale factor
set_offset: True
time_status_step: "0:00:10" # 10 seconds * time scale factor
time_status_init: "2019-03-01T00:00:00+00:00"
shut_down_when_terminated: False
manager_app_name: "manager"
Then, create a .env file with the following content:
USERNAME="admin"
PASSWORD="admin"
Creating a Publisher
Create a file named nost_publisher.py with the following content:
from nost_tools.config import ConnectionConfig
from nost_tools.managed_application import ManagedApplication
import time
import logging
import random
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
# Load connection configuration from YAML
config = ConnectionConfig(yaml_file="sos.yaml")
# Define application name
NAME = "publisher"
# Create the managed application
app = ManagedApplication(NAME)
# Start up the application
app.start_up(
config.rc.simulation_configuration.execution_parameters.general.prefix,
config
)
# Send messages in a loop
try:
message_count = 0
logger.info("Starting to publish messages. Press CTRL+C to stop.")
while True:
message_count += 1
message = f"This is test message #{message_count} with value: {random.random():.4f}"
# Send a message
app.send_message(
app_name="publisher",
app_topics="test",
payload=message
)
logger.info(f"Published message: {message}")
time.sleep(2) # Publish a message every 2 seconds
except KeyboardInterrupt:
logger.info("Stopping publisher...")
finally:
# Clean shutdown would go here
logger.info("Publisher stopped")
Creating a Consumer
Create a file named nost_consumer.py with the following content:
from nost_tools.config import ConnectionConfig
from nost_tools.managed_application import ManagedApplication
import logging
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def callback(ch, method, properties, body):
"""Process received messages"""
body = body.decode("utf-8")
logger.info(f"Received message: {body}")
# Load connection configuration from YAML
config = ConnectionConfig(yaml_file="sos.yaml")
# Define application name
NAME = "observer1"
# Create the managed application
app = ManagedApplication(NAME)
# Start up the application
app.start_up(
config.rc.simulation_configuration.execution_parameters.general.prefix,
config
)
# Register callback for messages from publisher
app.add_message_callback(
app_name="publisher",
app_topic="test",
user_callback=callback
)
logger.info("Consumer started. Waiting for messages. Press CTRL+C to stop.")
try:
# Keep application running
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Consumer stopped")
Running the Example
First, make sure you have RabbitMQ running (as described in the RabbitMQ Broker on Local Host guide).
Open two terminal windows.
In the first terminal, start the consumer:
python3 nost_consumer.pyIn the second terminal, start the publisher:
python3 nost_publisher.pyObserve the messages being received in the consumer terminal.
Understanding the NOS-T Implementation
This example demonstrates several key NOS-T concepts:
Connection Configuration: The
ConnectionConfigclass loads broker settings from a YAML file.Managed Application: The
ManagedApplicationclass handles connection management and message routing.Message Callbacks: The consumer registers callbacks that are triggered when messages arrive.
Topics: Messages are published with specific topics that consumers can subscribe to.
Payload Handling: Messages can carry arbitrary string payloads.
Unlike the direct pika implementation, NOS-T abstracts away many messaging details, making the code more concise and focused on the application logic.
Troubleshooting
If you encounter issues:
Configuration errors: Ensure your
sos.yamlfile is correctly formatted and contains valid broker details.Connection refused: Ensure your RabbitMQ broker is running. Check with
docker ps.Authentication failed: Verify the username and password in the YAML file match your RabbitMQ configuration.
No messages received: Check that the application names and topics match between publisher and consumer.
You can also check the RabbitMQ management interface at http://localhost:15672/ to view exchanges, queues, and message flows.
Next Steps
Try adding multiple consumers with different callbacks
Experiment with different message payloads (JSON, XML, etc.)
Implement more complex routing patterns using different topics
Explore other NOS-T features like time synchronization and simulation control