Event-Driven Architecture (Continued)
Running the Event-Driven System
The following code demonstrates how to run our event-driven microservice system:
def event_driven_architecture_example():
"""Example of an event-driven architecture using LeanMQ."""
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Create services
order_service = OrderService()
payment_service = PaymentService()
shipping_service = ShippingService()
notification_service = NotificationService()
try:
# Start all services
for service in [order_service, payment_service, shipping_service, notification_service]:
service.start()
# Allow services to initialize
time.sleep(2)
# Create some orders
order_ids = []
for i in range(3):
user_id = f"user-{uuid.uuid4().hex[:8]}"
items = [
{
"product_id": f"prod-{uuid.uuid4().hex[:8]}",
"name": f"Product {i+1}",
"price": 19.99 + i * 10,
"quantity": i + 1
},
{
"product_id": f"prod-{uuid.uuid4().hex[:8]}",
"name": f"Accessory {i+1}",
"price": 5.99,
"quantity": 1
}
]
order_id = order_service.create_order(user_id, items)
order_ids.append(order_id)
# Wait a bit between orders
time.sleep(1)
# Wait for all events to be processed
print("\nWaiting for all events to be processed...")
time.sleep(10)
# Show final order states
print("\nFinal order states:")
for order_id in order_ids:
if order_id in order_service.orders:
order = order_service.orders[order_id]
print(f"Order {order_id}: {order['status']}")
if "tracking_number" in order:
print(f" Tracking: {order['tracking_number']} ({order['carrier']})")
if "payment_id" in order:
print(f" Payment: {order['payment_id']}")
print()
finally:
# Shut down all services
print("\nShutting down services...")
for service in [order_service, payment_service, shipping_service, notification_service]:
service.close()
print("Event-driven architecture example completed")
if __name__ == "__main__":
event_driven_architecture_example()
API Gateway Pattern
This is another common microservice communication pattern. In this pattern, an API Gateway serves as a single entry point for clients and routes requests to appropriate internal services.
Here's how you could implement a simple API Gateway using LeanMQ's webhook pattern:
import json
import threading
import time
import uuid
from typing import Any, Dict, List, Optional
from leanmq import LeanMQWebhook
class ApiGateway:
"""API Gateway that routes requests to internal services."""
def __init__(
self,
redis_host: str = "localhost",
redis_port: int = 6379,
redis_db: int = 0
):
self.webhook = LeanMQWebhook(
redis_host=redis_host,
redis_port=redis_port,
redis_db=redis_db,
prefix="api-gateway:",
auto_start=False
)
# Start the webhook service
self.service = self.webhook.run_service()
# Map of request IDs to responses (for request-response pattern)
self.pending_responses = {}
self.response_lock = threading.Lock()
# Register internal route handlers
self._register_routes()
print("API Gateway started")
def _register_routes(self):
"""Register internal route handlers for service responses."""
@self.webhook.get("/responses/")
def handle_response(data):
"""Handle responses from internal services."""
request_id = data.get("request_id")
if request_id:
with self.response_lock:
self.pending_responses[request_id] = data
def route_request(self, service: str, endpoint: str, data: Dict[str, Any], timeout: int = 10) -> Dict[str, Any]:
"""Route a request to an internal service and wait for response.
Args:
service: Service name
endpoint: Service endpoint
data: Request data
timeout: Timeout in seconds
Returns:
Response data
Raises:
TimeoutError: If no response received within timeout
"""
# Generate request ID
request_id = str(uuid.uuid4())
# Add request metadata
request_data = data.copy()
request_data["request_id"] = request_id
request_data["timestamp"] = time.time()
request_data["response_path"] = "/responses/"
# Send request to service
path = f"/{service}/{endpoint}/"
self.webhook.send(path, request_data)
print(f"Routed request to {path}, ID: {request_id}")
# Wait for response
start_time = time.time()
while time.time() - start_time < timeout:
with self.response_lock:
if request_id in self.pending_responses:
response = self.pending_responses.pop(request_id)
return response
# Brief delay to avoid tight loop
time.sleep(0.1)
# Timeout reached
raise TimeoutError(f"No response received from {service}/{endpoint} within {timeout} seconds")
def close(self):
"""Close the API Gateway."""
if self.service:
self.service.stop()
self.webhook.close()
print("API Gateway stopped")
class OrderServiceClient:
"""Client for the Order Service."""
def __init__(
self,
redis_host: str = "localhost",
redis_port: int = 6379,
redis_db: int = 0
):
self.webhook = LeanMQWebhook(
redis_host=redis_host,
redis_port=redis_port,
redis_db=redis_db,
prefix="api-gateway:",
auto_start=False
)
# Start the webhook service
self.service = self.webhook.run_service()
# Register route handlers
self._register_routes()
print("Order Service started")
def _register_routes(self):
"""Register route handlers."""
@self.webhook.get("/orders/create/")
def handle_create_order(data):
"""Handle order creation requests."""
request_id = data.get("request_id")
response_path = data.get("response_path")
# Process order creation
print(f"Processing order creation: {data}")
# Simulate processing time
time.sleep(1.5)
# Generate order ID
order_id = f"order-{uuid.uuid4().hex[:8]}"
# Send response
response = {
"request_id": request_id,
"order_id": order_id,
"status": "created",
"message": "Order created successfully",
"timestamp": time.time()
}
self.webhook.send(response_path, response)
print(f"Sent response for request {request_id}")
@self.webhook.get("/orders/status/")
def handle_order_status(data):
"""Handle order status requests."""
request_id = data.get("request_id")
response_path = data.get("response_path")
order_id = data.get("order_id")
# Process order status request
print(f"Processing order status request for {order_id}")
# Simulate processing time
time.sleep(0.8)
# Simulate random order status
statuses = ["processing", "shipped", "delivered", "cancelled"]
status = statuses[uuid.uuid4().int % len(statuses)]
# Send response
response = {
"request_id": request_id,
"order_id": order_id,
"status": status,
"updated_at": time.time(),
"timestamp": time.time()
}
self.webhook.send(response_path, response)
print(f"Sent response for request {request_id}")
def close(self):
"""Close the service."""
if self.service:
self.service.stop()
self.webhook.close()
print("Order Service stopped")
def api_gateway_example():
"""Example of API Gateway pattern using LeanMQ."""
# Create API Gateway and services
gateway = ApiGateway()
order_service = OrderServiceClient()
try:
# Allow services to initialize
time.sleep(2)
# Create an order through the gateway
try:
response = gateway.route_request(
service="orders",
endpoint="create",
data={
"user_id": "user-12345",
"items": [
{"product_id": "prod-789", "quantity": 2, "price": 29.99},
{"product_id": "prod-456", "quantity": 1, "price": 49.99}
],
"shipping_address": "123 Main St, Anytown, US 12345"
}
)
print(f"\nOrder creation response: {json.dumps(response, indent=2)}")
# Get the order status
order_id = response.get("order_id")
status_response = gateway.route_request(
service="orders",
endpoint="status",
data={"order_id": order_id}
)
print(f"\nOrder status response: {json.dumps(status_response, indent=2)}")
except TimeoutError as e:
print(f"Error: {e}")
finally:
# Clean up
gateway.close()
order_service.close()
print("API Gateway example completed")
if __name__ == "__main__":
api_gateway_example()
Summary
These examples demonstrate how to implement common microservice communication patterns using LeanMQ:
Event-Driven Architecture: Services communicate by publishing and subscribing to events. This provides loose coupling and allows for reactive processing.
API Gateway Pattern: A central gateway routes client requests to appropriate internal services, providing a single entry point and abstracting the internal architecture.
Both patterns leverage LeanMQ's reliable message delivery, dead letter queues, and webhook-like interface to create robust microservice communications.
For more information about LeanMQ's capabilities:
- See the LeanMQ Core API Reference for details on queue management
- Explore the Webhook API Reference for the webhook-like interface
- Check the Transactions Guide for atomic operations across services