Creating a Simple Publisher-Consumer Example

After setting up your RabbitMQ broker, you can test its functionality by creating a simple publisher-consumer system. This example demonstrates how messages flow through the broker from publishers to consumers.

Prerequisites

Creating a Publisher

Create a file named publisher.py with the following content:

#!/usr/bin/env python
import json
import time
from datetime import datetime

import pika

# Connection parameters
credentials = pika.PlainCredentials("admin", "admin")
parameters = pika.ConnectionParameters("localhost", 5672, "/", credentials)

# Establish connection to RabbitMQ
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Declare an exchange
exchange_name = "example"
channel.exchange_declare(exchange=exchange_name, exchange_type="topic")

# Publish messages
try:
    message_count = 0
    print("Starting to publish messages. Press CTRL+C to stop.")

    while True:
        message_count += 1
        timestamp = datetime.now().isoformat()

        message = {
            "sequence": message_count,
            "timestamp": timestamp,
            "data": f"Test message {message_count}",
        }

        routing_key = "example.data"
        message_body = json.dumps(message)

        channel.basic_publish(
            exchange=exchange_name, routing_key=routing_key, body=message_body
        )

        print(f"Published message {message_count}: {message_body}")
        time.sleep(2)  # Publish a message every 2 seconds

except KeyboardInterrupt:
    print("Stopping publisher...")
finally:
    connection.close()
    print("Connection closed")

Creating a Consumer

Create a file named consumer.py with the following content:

#!/usr/bin/env python
import json

import pika

# Connection parameters
credentials = pika.PlainCredentials("admin", "admin")
parameters = pika.ConnectionParameters("localhost", 5672, "/", credentials)

# Establish connection to RabbitMQ
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Declare the same exchange as the publisher
exchange_name = "example"
channel.exchange_declare(exchange=exchange_name, exchange_type="topic")

# Create a queue with a random name
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue

# Bind the queue to the exchange with a routing key
binding_key = "example.*"
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=binding_key)

print(f"Subscribed to {exchange_name} with binding key {binding_key}")
print("Waiting for messages. To exit press CTRL+C")


# Define a callback function to be called when a message is received
def callback(ch, method, properties, body):
    try:
        message = json.loads(body)
        print(
            f"Received message {message['sequence']}: {message['data']} (sent at {message['timestamp']})"
        )
    except json.JSONDecodeError:
        print(f"Received message (non-JSON): {body}")


# Set up the consumer
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

# Start consuming messages
channel.start_consuming()

Running the Example

  1. Open two terminal windows.

  2. In the first terminal, start the consumer:

    >>> python3 consumer.py
    Subscribed to example with binding key example.*
    Waiting for messages. To exit press CTRL+C
    
  3. In the second terminal, start the publisher:

    >>> python3 publisher.py
    Starting to publish messages. Press CTRL+C to stop.
    Published message 1: {"sequence": 1, "timestamp": "2026-03-27T15:54:36.831856", "data": "Test message 1"}
    Published message 2: {"sequence": 2, "timestamp": "2026-03-27T15:54:38.833948", "data": "Test message 2"}
    ...
    
  4. Observe the messages being received in the consumer terminal:

    Received message 1: Test message 1 (sent at 2026-03-27T15:54:36.831856)
    Received message 2: Test message 2 (sent at 2026-03-27T15:54:38.833948)
    ...
    

Understanding the Example

This example demonstrates the core concepts of messaging with RabbitMQ:

  1. Publishers send messages to an exchange with a specific routing key.

  2. Exchanges route messages to queues based on the routing key and exchange type.

  3. Queues hold messages until they are consumed.

  4. Consumers consume messages from queues.

The publisher creates messages with a sequence number and timestamp, then publishes them to the “nost_example” exchange with the routing key “nost.example.data”.

The consumer creates a queue, binds it to the exchange with the binding pattern “nost.example.*”, and then consumes messages that match this pattern.

Troubleshooting

If you encounter issues:

  1. Connection refused: Ensure your RabbitMQ broker is running. Check with docker ps.

  2. Authentication failed: Verify the username and password in the code match your RabbitMQ configuration.

  3. No messages received: Check that the exchange name and routing/binding keys match between publisher and consumer.

  4. Broker not responding: Restart the RabbitMQ container using docker restart rabbitmq.

You can also check the RabbitMQ management interface at http://localhost:15672/ to view exchanges, queues, and message flows.

Next Steps

  • Try modifying the routing keys to see how message routing changes.

  • Experiment with different exchange types (direct, fanout, headers).

  • Create multiple consumers with different binding patterns.

  • Add message persistence for reliability.