Skip to content

Event-Driven Architecture (Continued)

Implementing Specific Microservices (Continued)

Shipping Service

python
class ShippingService(Microservice):
    """Service for handling shipping."""
    
    def __init__(
        self,
        redis_host: str = "localhost",
        redis_port: int = 6379,
        redis_db: int = 0
    ):
        super().__init__(
            service_name="shipping-service",
            redis_host=redis_host,
            redis_port=redis_port,
            redis_db=redis_db,
            event_types=["PaymentProcessedEvent"]
        )
        
        # In-memory shipment storage (for demonstration)
        self.shipments = {}
    
    def handle_event(self, event: Event) -> bool:
        """Handle incoming events.
        
        Args:
            event: Event to handle
            
        Returns:
            True if handled successfully
        """
        if isinstance(event, PaymentProcessedEvent):
            return self._handle_payment_processed(event)
        
        return False
    
    def _handle_payment_processed(self, event: PaymentProcessedEvent) -> bool:
        """Handle payment processed event.
        
        Args:
            event: Payment processed event
            
        Returns:
            True if handled successfully
        """
        # Only create shipments for successful payments
        if event.status != "succeeded":
            self.logger.info(f"Skipping shipment for failed payment on order {event.order_id}")
            return True
        
        order_id = event.order_id
        
        # Generate shipment details
        tracking_number = f"TRACK-{uuid.uuid4().hex[:8].upper()}"
        carriers = ["FedEx", "UPS", "USPS", "DHL"]
        carrier = carriers[uuid.uuid4().int % len(carriers)]
        
        # In a real service, this would integrate with shipping providers
        self.logger.info(f"Creating shipment for order {order_id}")
        
        # Simulate shipping processing
        time.sleep(2)
        
        # Store shipment information
        shipment = {
            "order_id": order_id,
            "tracking_number": tracking_number,
            "carrier": carrier,
            "status": "shipped",
            "created_at": time.time()
        }
        
        self.shipments[order_id] = shipment
        
        # Publish order shipped event
        event = OrderShippedEvent(
            order_id=order_id,
            tracking_number=tracking_number,
            carrier=carrier
        )
        
        self.publish_event(event)
        
        self.logger.info(f"Order {order_id} shipped with {carrier}, tracking: {tracking_number}")
        
        return True

Notification Service

python
class NotificationService(Microservice):
    """Service for sending notifications."""
    
    def __init__(
        self,
        redis_host: str = "localhost",
        redis_port: int = 6379,
        redis_db: int = 0
    ):
        super().__init__(
            service_name="notification-service",
            redis_host=redis_host,
            redis_port=redis_port,
            redis_db=redis_db,
            # Subscribe to all event types
            event_types=None
        )
    
    def handle_event(self, event: Event) -> bool:
        """Handle all events by sending appropriate notifications.
        
        Args:
            event: Event to handle
            
        Returns:
            True if handled successfully
        """
        if isinstance(event, OrderCreatedEvent):
            self._send_notification(
                user_id=event.user_id,
                subject="Order Confirmation",
                message=f"Your order #{event.order_id} has been received. "
                        f"Total: ${event.total:.2f}"
            )
        
        elif isinstance(event, PaymentProcessedEvent):
            if event.status == "succeeded":
                self._send_notification(
                    order_id=event.order_id,
                    subject="Payment Confirmation",
                    message=f"Your payment of ${event.amount:.2f} for order "
                            f"#{event.order_id} has been processed."
                )
            else:
                self._send_notification(
                    order_id=event.order_id,
                    subject="Payment Failed",
                    message=f"Your payment for order #{event.order_id} has failed. "
                            f"Please update your payment information."
                )
        
        elif isinstance(event, OrderShippedEvent):
            self._send_notification(
                order_id=event.order_id,
                subject="Order Shipped",
                message=f"Your order #{event.order_id} has been shipped with "
                        f"{event.carrier}. Tracking number: {event.tracking_number}"
            )
        
        return True
    
    def _send_notification(self, subject: str, message: str, user_id: str = None, order_id: str = None):
        """Send a notification (simulated).
        
        Args:
            subject: Notification subject
            message: Notification message
            user_id: User ID (optional)
            order_id: Order ID (optional)
        """
        # In a real service, this would send emails, SMS, push notifications, etc.
        # For demonstration, we'll just log the notification
        recipient = f"user {user_id}" if user_id else f"order {order_id}"
        self.logger.info(f"Sending notification to {recipient}:")
        self.logger.info(f"  Subject: {subject}")
        self.logger.info(f"  Message: {message}")

Continue to the Running the Event-Driven System section.