Skip to content

Event-Driven Architecture (Continued)

Implementing Specific Microservices

Order Service

python
class OrderService(Microservice):
    """Service for managing orders."""
    
    def __init__(
        self,
        redis_host: str = "localhost",
        redis_port: int = 6379,
        redis_db: int = 0
    ):
        super().__init__(
            service_name="order-service",
            redis_host=redis_host,
            redis_port=redis_port,
            redis_db=redis_db,
            event_types=["PaymentProcessedEvent", "OrderShippedEvent"]
        )
        
        # In-memory order storage (for demonstration)
        self.orders = {}
    
    def create_order(self, user_id: str, items: List[Dict[str, Any]]) -> str:
        """Create a new order.
        
        Args:
            user_id: User identifier
            items: List of order items
            
        Returns:
            Order identifier
        """
        # Generate order ID
        order_id = f"order-{uuid.uuid4()}"
        
        # Calculate total
        total = sum(item.get("price", 0) * item.get("quantity", 0) for item in items)
        
        # Create order
        order = {
            "order_id": order_id,
            "user_id": user_id,
            "items": items,
            "total": total,
            "status": "created",
            "created_at": time.time()
        }
        
        # Store order
        self.orders[order_id] = order
        
        # Publish event
        event = OrderCreatedEvent(
            order_id=order_id,
            user_id=user_id,
            total=total,
            items=items
        )
        
        self.publish_event(event)
        
        self.logger.info(f"Created order {order_id} for user {user_id}")
        
        return order_id
    
    def handle_event(self, event: Event) -> bool:
        """Handle incoming events.
        
        Args:
            event: Event to handle
            
        Returns:
            True if handled successfully
        """
        if isinstance(event, PaymentProcessedEvent):
            return self._handle_payment_processed(event)
        elif isinstance(event, OrderShippedEvent):
            return self._handle_order_shipped(event)
        
        return False
    
    def _handle_payment_processed(self, event: PaymentProcessedEvent) -> bool:
        """Handle payment processed event.
        
        Args:
            event: Payment processed event
            
        Returns:
            True if handled successfully
        """
        order_id = event.order_id
        status = event.status
        
        # Update order status based on payment result
        if order_id in self.orders:
            order = self.orders[order_id]
            
            if status == "succeeded":
                order["status"] = "paid"
                self.logger.info(f"Order {order_id} marked as paid")
            else:
                order["status"] = "payment_failed"
                self.logger.warning(f"Payment failed for order {order_id}")
            
            order["payment_id"] = event.payment_id
            order["updated_at"] = time.time()
            
            return True
        else:
            self.logger.error(f"Order {order_id} not found for payment {event.payment_id}")
            return False
    
    def _handle_order_shipped(self, event: OrderShippedEvent) -> bool:
        """Handle order shipped event.
        
        Args:
            event: Order shipped event
            
        Returns:
            True if handled successfully
        """
        order_id = event.order_id
        
        # Update order status
        if order_id in self.orders:
            order = self.orders[order_id]
            
            order["status"] = "shipped"
            order["tracking_number"] = event.tracking_number
            order["carrier"] = event.carrier
            order["shipped_at"] = time.time()
            
            self.logger.info(f"Order {order_id} marked as shipped")
            
            return True
        else:
            self.logger.error(f"Order {order_id} not found for shipping update")
            return False

Payment Service

python
class PaymentService(Microservice):
    """Service for processing payments."""
    
    def __init__(
        self,
        redis_host: str = "localhost",
        redis_port: int = 6379,
        redis_db: int = 0
    ):
        super().__init__(
            service_name="payment-service",
            redis_host=redis_host,
            redis_port=redis_port,
            redis_db=redis_db,
            event_types=["OrderCreatedEvent"]
        )
        
        # In-memory payment storage (for demonstration)
        self.payments = {}
    
    def handle_event(self, event: Event) -> bool:
        """Handle incoming events.
        
        Args:
            event: Event to handle
            
        Returns:
            True if handled successfully
        """
        if isinstance(event, OrderCreatedEvent):
            return self._handle_order_created(event)
        
        return False
    
    def _handle_order_created(self, event: OrderCreatedEvent) -> bool:
        """Handle order created event by processing payment.
        
        Args:
            event: Order created event
            
        Returns:
            True if handled successfully
        """
        order_id = event.order_id
        amount = event.total
        
        # Generate payment ID
        payment_id = f"payment-{uuid.uuid4()}"
        
        # In a real service, this would integrate with a payment provider
        # For demonstration, we'll simulate a payment
        self.logger.info(f"Processing payment for order {order_id}, amount: ${amount:.2f}")
        
        # Simulate payment processing
        time.sleep(1.5)
        
        # Simulate success (with occasional failures)
        success = uuid.uuid4().int % 10 != 0  # 90% success rate
        
        status = "succeeded" if success else "failed"
        
        # Store payment information
        payment = {
            "payment_id": payment_id,
            "order_id": order_id,
            "amount": amount,
            "status": status,
            "created_at": time.time()
        }
        
        self.payments[payment_id] = payment
        
        # Publish payment processed event
        event = PaymentProcessedEvent(
            payment_id=payment_id,
            order_id=order_id,
            amount=amount,
            status=status
        )
        
        self.publish_event(event)
        
        self.logger.info(f"Payment {payment_id} {status} for order {order_id}")
        
        return True

Continue to the Shipping and Notification Services section.