Skip to content

Basic Overview

Core Concepts

Before diving into code, let's understand a few key concepts:

  • Queue: A named stream where messages are stored and processed
  • Message: Data sent between services, stored in a queue
  • Dead Letter Queue (DLQ): A special queue where failed messages are moved
  • Consumer Group: A group of consumers that process messages together
  • Transaction: A way to perform multiple operations atomically

Basic Usage: Core API

Let's start with the core functionality of creating queues and sending messages:

python
from leanmq import LeanMQ

# Initialize the LeanMQ client
mq = LeanMQ(
    redis_host="localhost",
    redis_port=6379,
    redis_db=0,
    prefix="myapp:"  # Optional prefix for queue names
)

# Create a queue (and its corresponding dead letter queue)
main_queue, dlq = mq.create_queue_pair("notifications")

# Send a message to the queue
message_id = main_queue.send_message({"user_id": 123, "message": "Hello, world!"})
print(f"Sent message with ID: {message_id}")

# Get messages from the queue
messages = main_queue.get_messages(count=10)
for message in messages:
    print(f"Received message: {message.data}")
    
    # Process the message (your business logic here)
    # ...
    
    # Acknowledge successful processing
    main_queue.acknowledge_messages([message.id])

Using the Webhook Pattern

LeanMQ provides a webhook-like interface that's more intuitive for service-to-service communication:

python
from leanmq import LeanMQWebhook
import time

# Initialize the webhook client
webhook = LeanMQWebhook(
    redis_host="localhost",
    redis_port=6379,
    redis_db=0
)

# Register a webhook handler
@webhook.get("/users/created/")
def handle_user_created(data):
    print(f"New user created: {data['username']}")
    # Your processing logic here...

# Send a webhook event
webhook.send(
    "/users/created/",
    {
        "user_id": 456,
        "username": "johndoe",
        "email": "john@example.com",
        "created_at": time.time()
    }
)

# Process incoming webhooks (in a real app, you'd run this in a separate process or thread)
webhook.process_messages(block=True, timeout=5)

Running a Webhook Service

For continuous processing of webhook events, LeanMQ provides a convenient way to run a webhook service:

python
from leanmq import LeanMQWebhook
import time

# Initialize and set up your webhook handlers
webhook = LeanMQWebhook(redis_host="localhost", redis_port=6379, redis_db=0)

@webhook.get("/order/status/")
def handle_order_status(data):
    print(f"Order {data['order_id']} status updated to: {data['status']}")

# Run the webhook service (starts a background thread)
service = webhook.run_service(
    process_count=10,        # Process up to 10 messages per iteration
    block_for_seconds=1,     # Wait up to 1 second for new messages
    handle_signals=True,     # Register signal handlers for graceful shutdown
)

print("Webhook service is running...")

# In a real app, your main process would do other work here
# For this example, we'll just keep the service running for a while
try:
    while service.is_alive():
        time.sleep(1)
except KeyboardInterrupt:
    print("Shutting down...")
finally:
    # Stop the service when done
    service.stop()
    webhook.close()

Error Handling

LeanMQ automatically handles many error scenarios for you. Failed messages are moved to a dead letter queue for later inspection or reprocessing:

python
# Get messages from the dead letter queue
dlq = mq.get_dead_letter_queue("notifications")
failed_messages = dlq.get_messages(count=10)

for message in failed_messages:
    print(f"Failed message: {message.data}")
    print(f"Delivery attempts: {message.delivery_count}")
    
    # You can requeue messages from the DLQ back to the main queue
    dlq.requeue_messages([message.id], main_queue)

Using Transactions

For operations that need to be atomic (all succeed or all fail), use transactions:

python
# Create multiple queues
notifications_queue, _ = mq.create_queue_pair("notifications")
audit_queue, _ = mq.create_queue_pair("audit")

# Start a transaction
with mq.transaction() as tx:
    # Add multiple send operations to the transaction
    tx.send_message(notifications_queue, {"user_id": 123, "message": "Account updated"})
    tx.send_message(audit_queue, {"action": "account_update", "user_id": 123})
    
    # All messages will be sent atomically when the transaction block exits