The Webhook Pattern in LeanMQ
One of LeanMQ's most powerful features is its webhook-like interface, which provides a familiar and intuitive API for service-to-service communication. This guide explores how to use this pattern effectively in your applications.
What is the Webhook Pattern?
The webhook pattern is a communication mechanism where one service sends an HTTP request to another service when a specific event occurs. It's a popular way to implement event-driven architectures and integrate different systems.
Traditional webhooks have several challenges:
- They rely on HTTP, which can fail due to network issues
- If the receiving service is down, the webhook is typically lost
- Implementing retry logic is complex and often error-prone
- Scaling webhook handling requires careful design
LeanMQ provides a webhook-like interface that addresses these challenges by using Redis Streams as the underlying transport mechanism, offering persistence, reliability, and scalability.
Setting Up LeanMQWebhook
First, initialize the LeanMQWebhook class:
from leanmq import LeanMQWebhook
webhook = LeanMQWebhook(
redis_host="localhost",
redis_port=6379,
redis_db=0,
prefix="webhook:", # Optional prefix for queue names
process_interval=1, # Interval in seconds for processing messages
auto_start=False # Whether to start processing messages automatically
)
Registering Webhook Handlers
The LeanMQWebhook API uses a decorator pattern similar to web frameworks like Flask or FastAPI:
@webhook.get("/order/status/")
def process_order_status(data):
"""Process order status updates."""
order_id = data.get("order_id")
status = data.get("status")
print(f"Order {order_id} status updated to: {status}")
# Your business logic here...
if status == "shipped":
send_shipping_notification(order_id)
@webhook.get("/product/inventory/")
def process_inventory_update(data):
"""Process inventory updates."""
product_id = data.get("product_id")
quantity = data.get("quantity")
print(f"Product {product_id} inventory updated to: {quantity}")
# Your business logic here...
if quantity < 10:
trigger_restock_alert(product_id)
Behind the scenes, LeanMQWebhook:
- Creates a queue for each path pattern
- Maps the path to a specific queue name
- Registers your handler function to be called when messages arrive on that queue
Sending Webhook Events
To send a webhook event, simply call the send()
method with the path and data:
# Send an order status update
webhook.send(
"/order/status/",
{
"order_id": "ORD-12345",
"status": "shipped",
"updated_at": time.time()
}
)
# Send an inventory update
webhook.send(
"/product/inventory/",
{
"product_id": "PROD-789",
"name": "Wireless Headphones",
"quantity": 5,
"updated_at": time.time()
}
)
The path is used to determine which queue to send the message to, and which handler will process it.
Processing Webhook Messages
There are multiple ways to process webhook messages, depending on your application's needs:
1. Manual Processing
For simple use cases or testing, you can manually process messages:
# Process messages (optionally blocking until a message is received)
processed_count = webhook.process_messages(
block=True, # Whether to block waiting for messages
timeout=5, # How long to block (in seconds)
count=10 # Maximum number of messages to process per queue
)
print(f"Processed {processed_count} webhook(s)")
This approach is useful for:
- CRON-based processing
- Single-run scripts
- Testing and debugging
2. Running a Webhook Service
For continuous processing in production, use the run_service()
method:
# Start a webhook service that processes messages in a background thread
service = webhook.run_service(
process_count=10, # Maximum messages to process per iteration
block_for_seconds=1, # How long to block waiting for new messages
handle_signals=True, # Register signal handlers for graceful shutdown
worker_thread_timeout=5, # Timeout when waiting for worker thread to finish
log_level=logging.INFO # Logging level for the webhook service
)
try:
# Your main application code here...
while service.is_alive():
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down...")
finally:
# Stop the service when done
service.stop()
webhook.close()
This approach:
- Creates a background thread for processing messages
- Handles proper shutdown on SIGINT and SIGTERM signals
- Provides logging and error handling
- Makes it easy to integrate with your existing application
3. Advanced: Custom WebhookService
For even more control, you can create a custom WebhookService directly:
from leanmq import LeanMQWebhook, WebhookService
# Initialize webhook
webhook = LeanMQWebhook(redis_host="localhost", redis_port=6379, redis_db=0)
# Register handlers
@webhook.get("/some/path/")
def handler(data):
# ...
# Create a custom service with specific settings
service = WebhookService(
webhook=webhook,
process_count=20,
block_for_seconds=2,
handle_signals=True,
worker_thread_timeout=10
)
# Start the service
service.start()
# Later, stop the service
service.stop()
This approach gives you full control over the service's behavior and lifecycle.
Path Patterns and Routing
LeanMQWebhook uses a simple path-based routing system:
- Paths should start with a
/
- Use
/
to separate path segments - Paths are converted to queue names internally
For example:
/order/status/
→order_status
queue/product/inventory/
→product_inventory
queue
LeanMQ doesn't currently support wildcards or path parameters like /orders/{id}/status
, but you can implement your own routing logic inside the handler functions.
Error Handling and Retries
LeanMQWebhook automatically handles errors during message processing:
- If a handler function raises an exception, the message is moved to a dead letter queue
- The exception is logged for debugging
- Processing continues with the next message
To manually handle retries, you can:
@webhook.get("/order/processing/")
def process_order(data):
try:
# Your processing logic here...
process_order_in_external_system(data)
except TemporaryError as e:
# For temporary errors, you might want to "fail" the processing
# so the message stays in the queue and will be retried automatically
logger.warning(f"Temporary error processing order {data['order_id']}: {e}")
raise
except PermanentError as e:
# For permanent errors, you might want to handle the failure
# but acknowledge the message so it's not retried
logger.error(f"Permanent error processing order {data['order_id']}: {e}")
record_failed_order(data['order_id'], str(e))
# Don't re-raise the exception if you want to acknowledge the message
Monitoring and Debugging
To monitor your webhook processing:
Check queue information:
pythonqueue_info = webhook.mq.get_queue("order_status").get_info() print(f"Queue: {queue_info.name}") print(f"Message count: {queue_info.message_count}") print(f"Pending messages: {queue_info.pending_messages}")
Inspect the dead letter queue:
pythondlq = webhook.mq.get_dead_letter_queue("order_status") failed_messages = dlq.get_messages(count=10) for message in failed_messages: print(f"Failed message ID: {message.id}") print(f"Data: {message.data}") print(f"Delivery count: {message.delivery_count}")
Reprocess failed messages:
python# Move messages from DLQ back to the main queue dlq.requeue_messages([message.id for message in failed_messages])
Best Practices
Here are some tips for using LeanMQWebhook effectively:
Use meaningful path patterns:
- Group related functionality (e.g.,
/order/status/
,/order/payment/
) - Use consistent naming conventions
- Group related functionality (e.g.,
Keep handlers focused:
- A handler should do one thing well
- Split complex processing into multiple handlers if necessary
Use error handling:
- Catch and handle expected exceptions
- Let unexpected exceptions propagate to the DLQ
Monitor your DLQs:
- Set up alerts for messages in DLQs
- Regularly inspect and handle failed messages
Close connections properly:
- Use the
with
statement or explicitly callclose()
when done - Stop services gracefully before shutdown
- Use the
Consider message volume:
- Adjust
process_count
based on message volume and processing speed - For high-volume applications, consider multiple webhook service instances
- Adjust
Next Steps
Now that you understand the webhook pattern in LeanMQ, you can:
- Learn about advanced queue management
- Explore error handling strategies
- See how to scale LeanMQ for high-volume applications
- Check out the WebhookService API reference for complete details
For real-world example applications, visit the Examples section.