Creating a NOS-T Publisher-Consumer Example
This example demonstrates how to create a simple publisher-consumer system using the NOS-T library. The example showcases how messages flow through the broker from publishers to consumers using NOS-T.
Prerequisites
NOS-T installed (as described in Installation).
RabbitMQ broker running (as described in RabbitMQ Broker on Local Host).
Basic understanding of NOS-T messaging concepts.
Configuration File
First, create a file named sos.yaml with the following content:
info:
title: Novel Observing Strategies Testbed (NOS-T) YAML Configuration
version: '1.0.0'
description: Version-controlled AsyncAPI document for RabbitMQ event broker
servers:
rabbitmq:
keycloak_authentication: False
host: "localhost"
port: 5672
tls: False
virtual_host: "/"
message_expiration: "60000" # in milliseconds, message expiration time
delivery_mode: 2 # 1=transient, 2=durable
content_type: "text/plain"
heartbeat: 30 # in seconds
connection_attempts: 3
retry_delay: 5 # in seconds
execution:
general:
prefix: sos
Then, create a .env file with the following content. The required environment variables depend on your authentication mode (see Authentication Modes for details):
# Basic Auth (localhost development) - username and password only
USERNAME="admin"
PASSWORD="admin"
# Keycloak User Account - add these for Keycloak authentication
# CLIENT_ID="your-client-id"
# CLIENT_SECRET_KEY="your-client-secret"
# Keycloak Service Account - client credentials only (no username/password)
# CLIENT_ID="your-client-id"
# CLIENT_SECRET_KEY="your-client-secret"
Creating a Publisher
Create a file named nost_publisher.py with the following content:
import logging
import random
import time
from nost_tools.application import Application
from nost_tools.configuration import ConnectionConfig
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
# Load connection configuration from YAML
config = ConnectionConfig(yaml_file="sos.yaml")
# Define application name
NAME = "publisher"
# Create the managed application
app = Application(NAME)
# Start up the application
app.start_up(
config.rc.simulation_configuration.execution_parameters.general.prefix, config
)
# Send messages in a loop
try:
message_count = 0
logger.info("Starting to publish messages. Press CTRL+C to stop.")
while True:
message_count += 1
message = (
f"This is test message #{message_count} with value: {random.random():.4f}"
)
# Send a message
app.send_message(app_name=NAME, app_topics="test", payload=message)
logger.info(f"Published message: {message}")
time.sleep(2) # Publish a message every 2 seconds
except KeyboardInterrupt:
logger.info("Stopping publisher...")
finally:
# Clean shutdown would go here
logger.info("Publisher stopped")
Creating a Consumer
Create a file named nost_consumer.py with the following content:
import logging
import time
from nost_tools.application import Application
from nost_tools.configuration import ConnectionConfig
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def callback(ch, method, properties, body):
"""Process received messages"""
body = body.decode("utf-8")
logger.info(f"Received message: {body}")
# Load connection configuration from YAML
config = ConnectionConfig(yaml_file="sos.yaml")
# Define application name
NAME = "consumer"
# Create the managed application
app = Application(NAME)
# Start up the application
app.start_up(
config.rc.simulation_configuration.execution_parameters.general.prefix, config
)
# Register callback for messages from publisher
app.add_message_callback(app_name="publisher", app_topic="test", user_callback=callback)
logger.info("Consumer started. Waiting for messages. Press CTRL+C to stop.")
try:
# Keep application running
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Consumer stopped")
Running the Example
First, make sure you have RabbitMQ running (as described in the RabbitMQ Broker on Local Host guide).
Open two terminal windows.
In the first terminal, start the consumer:
python3 nost_consumer.pyIn the second terminal, start the publisher:
python3 nost_publisher.pyObserve the messages being received in the consumer terminal.
Understanding the NOS-T Implementation
This example demonstrates several key NOS-T concepts:
Connection Configuration: The
ConnectionConfigclass loads broker settings from a YAML file.Managed Application: The
ManagedApplicationclass handles connection management and message routing.Message Callbacks: The consumer registers callbacks that are triggered when messages arrive.
Topics: Messages are published with specific topics that consumers can subscribe to.
Payload Handling: Messages can carry arbitrary string payloads.
Unlike the direct pika implementation, NOS-T abstracts away many messaging details, making the code more concise and focused on the application logic.
Troubleshooting
If you encounter issues:
Configuration errors: Ensure your
sos.yamlfile is correctly formatted and contains valid broker details.Connection refused: Ensure your RabbitMQ broker is running. Check with
docker ps.Authentication failed: Verify the username and password in the YAML file match your RabbitMQ configuration.
No messages received: Check that the application names and topics match between publisher and consumer.
You can also check the RabbitMQ management interface at http://localhost:15672/ to view exchanges, queues, and message flows.
Next Steps
Try adding multiple consumers with different callbacks
Experiment with different message payloads (JSON, XML, etc.)
Implement more complex routing patterns using different topics
Explore other NOS-T features like time synchronization and simulation control