Queue API Reference
This reference documents the Queue-related classes and methods in LeanMQ.
Queue Class
The Queue
class represents a message queue and provides operations for sending, receiving, and managing messages.
Constructor
Queue(
name: str,
redis_connection: Redis,
is_dlq: bool = False,
consumer_group: Optional[str] = None
)
Parameters:
name
(str): Name of the queueredis_connection
(Redis): Redis connection to useis_dlq
(bool): Whether this queue is a dead letter queueconsumer_group
(Optional[str]): Name of the consumer group to use
WARNING
You should not create Queue objects directly. Instead, use the create_queue_pair()
, get_queue()
, or get_dead_letter_queue()
methods of the LeanMQ class.
Methods
send_message
send_message(
data: Dict[str, Any],
ttl_seconds: Optional[int] = None
) -> str
Send a message to the queue.
Parameters:
data
(Dict[str, Any]): Message datattl_seconds
(Optional[int]): Time-to-live for the message in seconds
Returns: Message ID
Example:
message_id = queue.send_message({"key": "value"})
print(f"Sent message with ID: {message_id}")
# Send a message that expires after 1 hour
ttl_seconds = 60 * 60
message_id = queue.send_message({"key": "value"}, ttl_seconds=ttl_seconds)
get_messages
get_messages(
count: int = 1,
block_for_seconds: Optional[float] = None,
consumer_id: str = "consumer1"
) -> List[Message]
Get messages from the queue.
Parameters:
count
(int): Maximum number of messages to getblock_for_seconds
(Optional[float]): How long to block waiting for messagesconsumer_id
(str): ID of the consumer getting messages
Returns: List of Message objects
Example:
# Get up to 10 messages, waiting up to 5 seconds for new messages
messages = queue.get_messages(count=10, block_for_seconds=5)
# Get messages with a specific consumer ID
messages = queue.get_messages(count=5, consumer_id="worker-123")
# Process the messages
for message in messages:
print(f"Message ID: {message.id}")
print(f"Data: {message.data}")
# Process the message...
# Acknowledge successful processing
queue.acknowledge_messages([message.id])
acknowledge_messages
acknowledge_messages(message_ids: List[str]) -> int
Acknowledge messages as processed without removing them from the stream.
Parameters:
message_ids
(List[str]): List of message IDs to acknowledge
Returns: Number of messages acknowledged
Example:
# Get messages
messages = queue.get_messages(count=5)
# Process them
for message in messages:
# Process the message...
pass
# Acknowledge all messages at once
acknowledged = queue.acknowledge_messages([m.id for m in messages])
print(f"Acknowledged {acknowledged} messages")
delete_messages
delete_messages(message_ids: List[str]) -> int
Permanently delete messages from the queue.
Parameters:
message_ids
(List[str]): List of message IDs to delete
Returns: Number of messages deleted
Example:
# Delete specific messages
deleted = queue.delete_messages(["1615456789012-0", "1615456789013-0"])
print(f"Deleted {deleted} messages")
# Get messages and then delete them after processing
messages = queue.get_messages(count=5)
for message in messages:
# Process the message...
pass
# Delete all processed messages
queue.delete_messages([m.id for m in messages])
move_to_dlq
move_to_dlq(
message_ids: List[str],
reason: str,
dlq: Optional["Queue"] = None
) -> int
Move messages to the dead letter queue.
Parameters:
message_ids
(List[str]): List of message IDs to movereason
(str): Reason for moving to DLQdlq
(Optional[Queue]): Dead letter queue to move to
Returns: Number of messages moved
Example:
# Get the DLQ
dlq = mq.get_dead_letter_queue("myqueue")
# Move specific messages to the DLQ
moved = queue.move_to_dlq(
["1615456789012-0", "1615456789013-0"],
"Failed processing: Invalid format",
dlq
)
print(f"Moved {moved} messages to DLQ")
# Error handling during message processing
messages = queue.get_messages(count=5)
for message in messages:
try:
# Process the message...
process_message(message.data)
# Acknowledge success
queue.acknowledge_messages([message.id])
except Exception as e:
# Move to DLQ on failure
queue.move_to_dlq([message.id], f"Processing error: {e}", dlq)
requeue_messages
requeue_messages(
message_ids: List[str],
destination_queue: Optional["Queue"] = None
) -> int
Move messages from a DLQ back to their original queue for reprocessing.
Parameters:
message_ids
(List[str]): List of message IDs to requeuedestination_queue
(Optional[Queue]): Queue to move messages to
Returns: Number of messages requeued
Example:
# Get messages from DLQ
failed_messages = dlq.get_messages(count=10)
# Get the original queue
main_queue = mq.get_queue("myqueue")
# Requeue the messages
requeued = dlq.requeue_messages(
[message.id for message in failed_messages],
main_queue
)
print(f"Requeued {requeued} messages for reprocessing")
purge
purge() -> int
Purge all messages from the queue.
Returns: Number of messages purged
Example:
# Remove all messages from the queue
purged = queue.purge()
print(f"Purged {purged} messages from the queue")
get_info
get_info() -> QueueInfo
Get information about the queue.
Returns: QueueInfo object with queue information
Example:
# Get queue information
queue_info = queue.get_info()
print(f"Queue: {queue_info.name}")
print(f"Is DLQ: {queue_info.is_dlq}")
print(f"Message count: {queue_info.message_count}")
print(f"Consumer group: {queue_info.consumer_group}")
print(f"Pending messages: {queue_info.pending_messages}")
print(f"Created at: {queue_info.created_at}")
QueueInfo Class
The QueueInfo
class provides information about a queue.
Attributes
name
(str): Name of the queueis_dlq
(bool): Whether this queue is a dead letter queuemessage_count
(int): Number of messages in the queueconsumer_group
(Optional[str]): Name of the consumer grouppending_messages
(int): Number of pending messagescreated_at
(Optional[float]): Timestamp when queue was created
Methods
The QueueInfo
class is primarily a data class and does not have significant methods beyond basic object functionality.
Example Usage
from leanmq import LeanMQ
# Initialize LeanMQ
mq = LeanMQ(redis_host="localhost", redis_port=6379)
# Create a queue pair
main_queue, dlq = mq.create_queue_pair("notifications")
# Send messages
for i in range(5):
main_queue.send_message({
"id": i,
"message": f"Notification {i}",
"timestamp": time.time()
})
# Get and process messages
messages = main_queue.get_messages(count=10)
for message in messages:
print(f"Processing message: {message.data}")
try:
# Process the message
process_notification(message.data)
# Acknowledge success
main_queue.acknowledge_messages([message.id])
except Exception as e:
print(f"Error processing message: {e}")
# Move to DLQ
main_queue.move_to_dlq([message.id], f"Processing error: {e}", dlq)
# Get queue information
queue_info = main_queue.get_info()
print(f"Queue: {queue_info.name}")
print(f"Message count: {queue_info.message_count}")
# Reprocess failed messages from DLQ
failed_messages = dlq.get_messages(count=10)
if failed_messages:
# Fix the issue that caused the failure
fix_processing_issue()
# Requeue the messages
dlq.requeue_messages([m.id for m in failed_messages], main_queue)
Related
- Core API: Documentation for the LeanMQ class that creates and manages queues
- Message API: Documentation for the Message class returned by
get_messages()