Skip to content

Microservice Communication Patterns

This example series demonstrates how to implement common microservice communication patterns using LeanMQ.

Event-Driven Architecture

This example shows how to implement an event-driven architecture where services communicate by publishing and subscribing to events.

Defining the Event System

python
import json
import logging
import threading
import time
import uuid
from typing import Any, Dict, List, Optional, Set, Type

from leanmq import LeanMQ


# Base Event class
class Event:
    """Base class for all events in the system."""
    
    def __init__(self, event_id: Optional[str] = None):
        self.event_id = event_id or str(uuid.uuid4())
        self.timestamp = time.time()
        self.event_type = self.__class__.__name__
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert event to dictionary for serialization."""
        return {
            "event_id": self.event_id,
            "timestamp": self.timestamp,
            "event_type": self.event_type
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "Event":
        """Create event from dictionary."""
        event = cls(event_id=data.get("event_id"))
        event.timestamp = data.get("timestamp", time.time())
        return event


# Specific event types
class OrderCreatedEvent(Event):
    """Event fired when a new order is created."""
    
    def __init__(
        self,
        order_id: str,
        user_id: str,
        total: float,
        items: List[Dict[str, Any]],
        event_id: Optional[str] = None
    ):
        super().__init__(event_id)
        self.order_id = order_id
        self.user_id = user_id
        self.total = total
        self.items = items
    
    def to_dict(self) -> Dict[str, Any]:
        data = super().to_dict()
        data.update({
            "order_id": self.order_id,
            "user_id": self.user_id,
            "total": self.total,
            "items": self.items
        })
        return data
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "OrderCreatedEvent":
        return cls(
            order_id=data.get("order_id", ""),
            user_id=data.get("user_id", ""),
            total=data.get("total", 0.0),
            items=data.get("items", []),
            event_id=data.get("event_id")
        )


class PaymentProcessedEvent(Event):
    """Event fired when a payment is processed."""
    
    def __init__(
        self,
        payment_id: str,
        order_id: str,
        amount: float,
        status: str,
        event_id: Optional[str] = None
    ):
        super().__init__(event_id)
        self.payment_id = payment_id
        self.order_id = order_id
        self.amount = amount
        self.status = status
    
    def to_dict(self) -> Dict[str, Any]:
        data = super().to_dict()
        data.update({
            "payment_id": self.payment_id,
            "order_id": self.order_id,
            "amount": self.amount,
            "status": self.status
        })
        return data
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "PaymentProcessedEvent":
        return cls(
            payment_id=data.get("payment_id", ""),
            order_id=data.get("order_id", ""),
            amount=data.get("amount", 0.0),
            status=data.get("status", ""),
            event_id=data.get("event_id")
        )


class OrderShippedEvent(Event):
    """Event fired when an order is shipped."""
    
    def __init__(
        self,
        order_id: str,
        tracking_number: str,
        carrier: str,
        event_id: Optional[str] = None
    ):
        super().__init__(event_id)
        self.order_id = order_id
        self.tracking_number = tracking_number
        self.carrier = carrier
    
    def to_dict(self) -> Dict[str, Any]:
        data = super().to_dict()
        data.update({
            "order_id": self.order_id,
            "tracking_number": self.tracking_number,
            "carrier": self.carrier
        })
        return data
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "OrderShippedEvent":
        return cls(
            order_id=data.get("order_id", ""),
            tracking_number=data.get("tracking_number", ""),
            carrier=data.get("carrier", ""),
            event_id=data.get("event_id")
        )


# Registry of event types for deserialization
EVENT_TYPES = {
    "OrderCreatedEvent": OrderCreatedEvent,
    "PaymentProcessedEvent": PaymentProcessedEvent,
    "OrderShippedEvent": OrderShippedEvent
}


# Event serialization/deserialization
def serialize_event(event: Event) -> Dict[str, Any]:
    """Serialize an event to a dictionary."""
    return event.to_dict()


def deserialize_event(data: Dict[str, Any]) -> Optional[Event]:
    """Deserialize a dictionary to an event."""
    event_type = data.get("event_type")
    if not event_type or event_type not in EVENT_TYPES:
        return None
    
    event_class = EVENT_TYPES[event_type]
    return event_class.from_dict(data)

Continue to the Event Bus Implementation section.