Skip to content

Event-Driven Architecture (Continued)

Microservice Implementation

python
class Microservice:
    """Base class for microservices in the system."""
    
    def __init__(
        self,
        service_name: str,
        redis_host: str = "localhost",
        redis_port: int = 6379,
        redis_db: int = 0,
        event_types: List[str] = None
    ):
        self.service_name = service_name
        
        # Initialize the event bus
        self.event_bus = EventBus(
            redis_host=redis_host,
            redis_port=redis_port,
            redis_db=redis_db
        )
        
        # Subscribe to events
        self.subscription_queue_name = self.event_bus.subscribe(
            service_name,
            event_types
        )
        
        # Get the subscription queue
        self.mq = self.event_bus.mq
        self.subscription_queue = self.mq.get_queue(self.subscription_queue_name)
        self.subscription_dlq = self.mq.get_dead_letter_queue(self.subscription_queue_name)
        
        # Running state
        self.running = False
        self.event_thread = None
        
        # Logger
        self.logger = logging.getLogger(f"Service:{service_name}")
    
    def start(self):
        """Start the microservice."""
        if self.running:
            return
        
        # Start the event bus
        self.event_bus.start()
        
        # Start the event handler thread
        self.running = True
        self.event_thread = threading.Thread(target=self._handle_events)
        self.event_thread.daemon = True
        self.event_thread.start()
        
        self.logger.info(f"Microservice '{self.service_name}' started")
    
    def stop(self):
        """Stop the microservice."""
        if not self.running:
            return
        
        self.running = False
        
        if self.event_thread and self.event_thread.is_alive():
            self.event_thread.join(timeout=5)
        
        # Stop the event bus
        self.event_bus.stop()
        
        self.logger.info(f"Microservice '{self.service_name}' stopped")
    
    def close(self):
        """Close the microservice and release resources."""
        self.stop()
        self.event_bus.close()
    
    def publish_event(self, event: Event):
        """Publish an event.
        
        Args:
            event: Event to publish
        """
        self.event_bus.publish(event)
    
    def handle_event(self, event: Event):
        """Handle an incoming event.
        
        Args:
            event: Event to handle
            
        Returns:
            True if event was handled successfully, False otherwise
        """
        # Default implementation - subclasses should override
        self.logger.info(f"Received event: {event.event_type}")
        return True
    
    def _handle_events(self):
        """Process incoming events."""
        self.logger.info(f"Event handler started for '{self.service_name}'")
        
        while self.running:
            try:
                # Get events from the subscription queue
                messages = self.subscription_queue.get_messages(
                    count=5,
                    block_for_seconds=1
                )
                
                for message in messages:
                    try:
                        # Deserialize the event
                        event = deserialize_event(message.data)
                        
                        if not event:
                            self.logger.warning(f"Unknown event type: {message.data.get('event_type')}")
                            self.subscription_queue.acknowledge_messages([message.id])
                            continue
                        
                        # Handle the event
                        success = self.handle_event(event)
                        
                        # Acknowledge successful processing
                        if success:
                            self.subscription_queue.acknowledge_messages([message.id])
                        else:
                            # Move to DLQ if handling failed
                            self.subscription_queue.move_to_dlq(
                                [message.id],
                                "Event handling returned false",
                                self.subscription_dlq
                            )
                    
                    except Exception as e:
                        self.logger.error(f"Error handling event: {e}")
                        
                        # Move to DLQ
                        self.subscription_queue.move_to_dlq(
                            [message.id],
                            f"Handling error: {e}",
                            self.subscription_dlq
                        )
            
            except Exception as e:
                self.logger.error(f"Error in event handler: {e}")
                time.sleep(1)  # Avoid tight loop on persistent errors
        
        self.logger.info(f"Event handler stopped for '{self.service_name}'")

Continue to the Implementing Specific Microservices section.