Skip to content

Event-Driven Architecture (Continued)

Event Bus Implementation

python
class EventBus:
    """Central event bus for publishing and subscribing to events."""
    
    def __init__(
        self,
        redis_host: str = "localhost",
        redis_port: int = 6379,
        redis_db: int = 0,
        prefix: str = "events:"
    ):
        # Initialize LeanMQ
        self.mq = LeanMQ(
            redis_host=redis_host,
            redis_port=redis_port,
            redis_db=redis_db,
            prefix=prefix
        )
        
        # Create main events queue
        self.events_queue, self.events_dlq = self.mq.create_queue_pair("all")
        
        # Map of event type to set of subscriber queues
        self.subscribers = {}
        
        # Logger
        self.logger = logging.getLogger("EventBus")
        
        # Running state
        self.running = False
        self.processor_thread = None
    
    def start(self):
        """Start the event bus processor."""
        if self.running:
            return
        
        self.running = True
        self.processor_thread = threading.Thread(target=self._process_events)
        self.processor_thread.daemon = True
        self.processor_thread.start()
        
        self.logger.info("Event bus started")
    
    def stop(self):
        """Stop the event bus processor."""
        if not self.running:
            return
        
        self.running = False
        
        if self.processor_thread and self.processor_thread.is_alive():
            self.processor_thread.join(timeout=5)
        
        self.logger.info("Event bus stopped")
    
    def close(self):
        """Close the event bus and release resources."""
        self.stop()
        self.mq.close()
    
    def publish(self, event: Event) -> str:
        """Publish an event to the event bus.
        
        Args:
            event: Event to publish
            
        Returns:
            Message ID of the published event
        """
        # Serialize the event
        event_data = serialize_event(event)
        
        # Publish to the events queue
        message_id = self.events_queue.send_message(event_data)
        
        self.logger.info(f"Published event: {event.event_type}, ID: {event.event_id}")
        
        return message_id
    
    def subscribe(self, service_name: str, event_types: List[str] = None) -> str:
        """Subscribe a service to specific event types.
        
        Args:
            service_name: Name of the subscribing service
            event_types: List of event types to subscribe to (None for all)
            
        Returns:
            Name of the subscription queue
        """
        # Create a subscription queue for this service
        queue_name = f"sub:{service_name}"
        queue, _ = self.mq.create_queue_pair(queue_name)
        
        # Register subscription
        for event_type in (event_types or EVENT_TYPES.keys()):
            if event_type not in self.subscribers:
                self.subscribers[event_type] = set()
            
            self.subscribers[event_type].add(queue_name)
        
        self.logger.info(f"Service '{service_name}' subscribed to events: {event_types or 'all'}")
        
        return queue_name
    
    def _process_events(self):
        """Process events from the main queue and distribute to subscribers."""
        self.logger.info("Event processor started")
        
        while self.running:
            try:
                # Get events from the main queue
                messages = self.events_queue.get_messages(
                    count=10,
                    block_for_seconds=1
                )
                
                for message in messages:
                    try:
                        # Deserialize the event
                        event_data = message.data
                        event = deserialize_event(event_data)
                        
                        if not event:
                            self.logger.warning(f"Unknown event type: {event_data.get('event_type')}")
                            continue
                        
                        # Get subscribers for this event type
                        event_type = event.event_type
                        subscriber_queues = self.subscribers.get(event_type, set())
                        
                        # Distribute to subscribers
                        for queue_name in subscriber_queues:
                            try:
                                subscriber_queue = self.mq.get_queue(queue_name)
                                if subscriber_queue:
                                    subscriber_queue.send_message(event_data)
                            except Exception as e:
                                self.logger.error(f"Error distributing to {queue_name}: {e}")
                        
                        # Acknowledge the event
                        self.events_queue.acknowledge_messages([message.id])
                        
                    except Exception as e:
                        self.logger.error(f"Error processing event {message.id}: {e}")
                        
                        # Move to DLQ
                        self.events_queue.move_to_dlq(
                            [message.id],
                            f"Processing error: {e}",
                            self.events_dlq
                        )
            
            except Exception as e:
                self.logger.error(f"Error in event processor: {e}")
                time.sleep(1)  # Avoid tight loop on persistent errors
        
        self.logger.info("Event processor stopped")

Continue to the Microservice Implementation section.