Message API Reference
This reference documents the Message class in LeanMQ.
Message Class
The Message
class represents a message in a queue. Messages are returned when you call queue.get_messages()
and contain both the message data and metadata about the message.
Attributes
id
(str): Unique identifier for the message. This is the Redis stream ID in the format{timestamp}-{sequence}
(e.g., '1615456789012-0').data
(Dict[str, Any]): Dictionary containing the message payload.timestamp
(float): Unix timestamp when message was created.delivery_count
(int): Number of times this message has been delivered.
Constructor
Message(
id: str,
data: Dict[str, Any],
timestamp: Optional[float] = None,
delivery_count: int = 0
)
Parameters:
id
(str): Unique identifier for the messagedata
(Dict[str, Any]): Message payloadtimestamp
(Optional[float]): Unix timestamp when message was createddelivery_count
(int): Number of times this message has been delivered
WARNING
You should not create Message objects directly. They are created by the get_messages()
method of the Queue class.
Working with Messages
Getting Messages
Messages are retrieved from a queue using the get_messages()
method:
from leanmq import LeanMQ
# Initialize LeanMQ
mq = LeanMQ(redis_host="localhost", redis_port=6379)
# Get a queue
queue = mq.get_queue("notifications")
# Get messages from the queue
messages = queue.get_messages(count=10, block_for_seconds=5)
# Process the messages
for message in messages:
print(f"Message ID: {message.id}")
print(f"Data: {message.data}")
print(f"Timestamp: {message.timestamp}")
print(f"Delivery count: {message.delivery_count}")
# Process the message...
# Acknowledge successful processing
queue.acknowledge_messages([message.id])
Message Structure
The data
attribute of a Message contains the actual payload that was sent to the queue. This is a Python dictionary that you can access directly:
# Access message data
message = messages[0]
user_id = message.data.get("user_id")
notification_type = message.data.get("type")
content = message.data.get("content")
print(f"Notification for user {user_id}: {notification_type}")
print(f"Content: {content}")
Message IDs
The id
attribute is a unique identifier for the message in the Redis stream. It's in the format {timestamp}-{sequence}
and is used when acknowledging, deleting, or moving messages:
# Get messages
messages = queue.get_messages(count=5)
# Collect message IDs
message_ids = [message.id for message in messages]
# Use the IDs with queue operations
queue.acknowledge_messages(message_ids)
Message Timestamps
The timestamp
attribute indicates when the message was created, as a Unix timestamp (seconds since epoch):
import time
from datetime import datetime
# Get a message
message = queue.get_messages(count=1)[0]
# Convert timestamp to readable datetime
created_at = datetime.fromtimestamp(message.timestamp)
print(f"Message created at: {created_at.strftime('%Y-%m-%d %H:%M:%S')}")
# Check how old the message is
age_seconds = time.time() - message.timestamp
print(f"Message age: {age_seconds:.2f} seconds")
Delivery Count
The delivery_count
attribute tracks how many times a message has been delivered to consumers. This is useful for implementing retry logic with backoff:
# Get messages
messages = queue.get_messages(count=10)
for message in messages:
print(f"Message ID: {message.id}, Delivery count: {message.delivery_count}")
try:
# Process the message
process_message(message.data)
# Acknowledge success
queue.acknowledge_messages([message.id])
except Exception as e:
# Implement retry logic with backoff
if message.delivery_count >= 3:
# After 3 failures, move to DLQ
dlq = mq.get_dead_letter_queue("myqueue")
queue.move_to_dlq([message.id], f"Failed after {message.delivery_count} attempts: {e}", dlq)
else:
# Let it be redelivered (don't acknowledge)
print(f"Processing failed, will retry. Attempt: {message.delivery_count}")
Example: Message Processing Patterns
Basic Processing
# Get messages
messages = queue.get_messages(count=10)
# Process each message
for message in messages:
try:
result = process_message(message.data)
print(f"Processed message {message.id}: {result}")
# Acknowledge successful processing
queue.acknowledge_messages([message.id])
except Exception as e:
print(f"Error processing message {message.id}: {e}")
# Move to DLQ
dlq = mq.get_dead_letter_queue("myqueue")
queue.move_to_dlq([message.id], f"Processing error: {e}", dlq)
Batch Processing
# Get messages
messages = queue.get_messages(count=100)
if not messages:
print("No messages to process")
else:
try:
# Extract data for batch processing
batch_data = [message.data for message in messages]
# Process in batch
results = batch_process_messages(batch_data)
# Acknowledge all successful messages
queue.acknowledge_messages([message.id for message in messages])
print(f"Processed {len(messages)} messages in batch")
except Exception as e:
print(f"Batch processing failed: {e}")
# In case of batch failure, move all to DLQ
dlq = mq.get_dead_letter_queue("myqueue")
queue.move_to_dlq([m.id for m in messages], f"Batch processing error: {e}", dlq)
Priority-Based Processing
# Process messages based on priority field
messages = queue.get_messages(count=20)
# Sort messages by priority (if available)
sorted_messages = sorted(
messages,
key=lambda m: m.data.get("priority", 5), # Default to medium priority (5)
reverse=True # Higher numbers = higher priority
)
# Process in priority order
for message in sorted_messages:
priority = message.data.get("priority", 5)
print(f"Processing message with priority {priority}: {message.id}")
# Process the message...
# Acknowledge when done
queue.acknowledge_messages([message.id])
Message Filtering
# Get messages but only process those matching certain criteria
messages = queue.get_messages(count=50)
# Filter messages
important_messages = [
message for message in messages
if message.data.get("user_type") == "premium" or message.data.get("urgent", False)
]
# Process important messages first
for message in important_messages:
process_message(message.data)
queue.acknowledge_messages([message.id])
# Then process the rest
other_messages = [m for m in messages if m not in important_messages]
for message in other_messages:
process_message(message.data)
queue.acknowledge_messages([message.id])