Creating a Simple Publisher-Consumer Example
After setting up your RabbitMQ broker, you can test its functionality by creating a simple publisher-consumer system. This example demonstrates how messages flow through the broker from publishers to consumers.
Prerequisites
NOS-T installed (as described in Installation.)
RabbitMQ broker running (as described in RabbitMQ Broker on Local Host.)
Basic understanding of messaging concepts
Creating a Publisher
Create a file named publisher.py with the following content:
#!/usr/bin/env python
import json
import time
from datetime import datetime
import pika
# Connection parameters
credentials = pika.PlainCredentials("admin", "admin")
parameters = pika.ConnectionParameters("localhost", 5672, "/", credentials)
# Establish connection to RabbitMQ
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# Declare an exchange
exchange_name = "example"
channel.exchange_declare(exchange=exchange_name, exchange_type="topic")
# Publish messages
try:
message_count = 0
print("Starting to publish messages. Press CTRL+C to stop.")
while True:
message_count += 1
timestamp = datetime.now().isoformat()
message = {
"sequence": message_count,
"timestamp": timestamp,
"data": f"Test message {message_count}",
}
routing_key = "example.data"
message_body = json.dumps(message)
channel.basic_publish(
exchange=exchange_name, routing_key=routing_key, body=message_body
)
print(f"Published message {message_count}: {message_body}")
time.sleep(2) # Publish a message every 2 seconds
except KeyboardInterrupt:
print("Stopping publisher...")
finally:
connection.close()
print("Connection closed")
Creating a Consumer
Create a file named consumer.py with the following content:
#!/usr/bin/env python
import json
import pika
# Connection parameters
credentials = pika.PlainCredentials("admin", "admin")
parameters = pika.ConnectionParameters("localhost", 5672, "/", credentials)
# Establish connection to RabbitMQ
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# Declare the same exchange as the publisher
exchange_name = "example"
channel.exchange_declare(exchange=exchange_name, exchange_type="topic")
# Create a queue with a random name
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
# Bind the queue to the exchange with a routing key
binding_key = "example.*"
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=binding_key)
print(f"Subscribed to {exchange_name} with binding key {binding_key}")
print("Waiting for messages. To exit press CTRL+C")
# Define a callback function to be called when a message is received
def callback(ch, method, properties, body):
try:
message = json.loads(body)
print(
f"Received message {message['sequence']}: {message['data']} (sent at {message['timestamp']})"
)
except json.JSONDecodeError:
print(f"Received message (non-JSON): {body}")
# Set up the consumer
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
# Start consuming messages
channel.start_consuming()
Running the Example
Open two terminal windows.
In the first terminal, start the consumer:
>>> python3 consumer.py Subscribed to example with binding key example.* Waiting for messages. To exit press CTRL+C
In the second terminal, start the publisher:
>>> python3 publisher.py Starting to publish messages. Press CTRL+C to stop. Published message 1: {"sequence": 1, "timestamp": "2026-03-27T15:54:36.831856", "data": "Test message 1"} Published message 2: {"sequence": 2, "timestamp": "2026-03-27T15:54:38.833948", "data": "Test message 2"} ...
Observe the messages being received in the consumer terminal:
Received message 1: Test message 1 (sent at 2026-03-27T15:54:36.831856) Received message 2: Test message 2 (sent at 2026-03-27T15:54:38.833948) ...
Understanding the Example
This example demonstrates the core concepts of messaging with RabbitMQ:
Publishers send messages to an exchange with a specific routing key.
Exchanges route messages to queues based on the routing key and exchange type.
Queues hold messages until they are consumed.
Consumers consume messages from queues.
The publisher creates messages with a sequence number and timestamp, then publishes them to the “nost_example” exchange with the routing key “nost.example.data”.
The consumer creates a queue, binds it to the exchange with the binding pattern “nost.example.*”, and then consumes messages that match this pattern.
Troubleshooting
If you encounter issues:
Connection refused: Ensure your RabbitMQ broker is running. Check with
docker ps.Authentication failed: Verify the username and password in the code match your RabbitMQ configuration.
No messages received: Check that the exchange name and routing/binding keys match between publisher and consumer.
Broker not responding: Restart the RabbitMQ container using
docker restart rabbitmq.
You can also check the RabbitMQ management interface at http://localhost:15672/ to view exchanges, queues, and message flows.
Next Steps
Try modifying the routing keys to see how message routing changes.
Experiment with different exchange types (direct, fanout, headers).
Create multiple consumers with different binding patterns.
Add message persistence for reliability.