Transaction API Reference
This reference documents the Transaction class and related functionality in LeanMQ.
Transaction Class
The Transaction
class represents a Redis transaction for atomic operations. It allows you to group multiple message send operations into a single atomic unit that either all succeed or all fail together.
Constructor
Transaction(redis_connection: Redis)
Parameters:
redis_connection
(Redis): Redis connection to use for the transaction
WARNING
You should not create Transaction objects directly. Instead, use the transaction()
method of the LeanMQ class.
Methods
send_message
send_message(
queue: Queue,
data: Dict[str, Any],
ttl_seconds: Optional[int] = None
) -> None
Add a message send operation to the transaction.
Parameters:
queue
(Queue): Queue to send the message todata
(Dict[str, Any]): Message datattl_seconds
(Optional[int]): Time-to-live for the message in seconds
Example:
with mq.transaction() as tx:
tx.send_message(queue1, {"key": "value1"})
tx.send_message(queue2, {"key": "value2"}, ttl_seconds=3600)
_execute
_execute() -> None
Execute the transaction.
WARNING
This method is called automatically when the context manager exits. You should not call it directly.
Using Transactions
Starting a Transaction
Transactions are typically used with a context manager:
from leanmq import LeanMQ
# Initialize LeanMQ
mq = LeanMQ(redis_host="localhost", redis_port=6379)
# Create queues
order_queue, _ = mq.create_queue_pair("orders")
notification_queue, _ = mq.create_queue_pair("notifications")
audit_queue, _ = mq.create_queue_pair("audit")
# Start a transaction
with mq.transaction() as tx:
# Add operations to the transaction
tx.send_message(order_queue, {
"order_id": "ORD-12345",
"status": "paid",
"amount": 99.99
})
tx.send_message(notification_queue, {
"type": "order_paid",
"order_id": "ORD-12345",
"message": "Payment received for order ORD-12345"
})
tx.send_message(audit_queue, {
"action": "order_payment",
"order_id": "ORD-12345",
"amount": 99.99,
"timestamp": time.time()
})
# The transaction is automatically executed when exiting the context
Error Handling
If any operation in the transaction fails, the entire transaction is rolled back:
from leanmq import LeanMQ, TransactionError
mq = LeanMQ(redis_host="localhost", redis_port=6379)
queue1, _ = mq.create_queue_pair("queue1")
queue2, _ = mq.create_queue_pair("queue2")
try:
with mq.transaction() as tx:
tx.send_message(queue1, {"key": "value1"})
# Simulate an error condition
if some_error_condition:
raise ValueError("Something went wrong")
tx.send_message(queue2, {"key": "value2"})
except (TransactionError, ValueError) as e:
print(f"Transaction failed: {e}")
# Handle the failure (e.g., retry or notify)
Manual Transaction Handling
If you need more control over when the transaction is executed, you can avoid using the context manager:
# Create a transaction
tx = mq.transaction()
# Add operations
tx.send_message(queue1, {"key": "value1"})
tx.send_message(queue2, {"key": "value2"})
try:
# Execute the transaction manually
tx._execute()
except TransactionError as e:
print(f"Transaction failed: {e}")
However, using the context manager is recommended as it ensures proper cleanup even if exceptions occur.
Transaction Patterns
Multi-Queue Publishing
Use transactions to ensure messages are published to multiple queues atomically:
def publish_event(event_type, event_data):
"""Publish an event to multiple queues atomically."""
with mq.transaction() as tx:
# Add event timestamp
event_data["timestamp"] = time.time()
# Publish to main event queue
tx.send_message(event_queue, {
"type": event_type,
"data": event_data
})
# Publish to specific event type queue
type_specific_queue = mq.get_queue(f"events:{event_type}")
if type_specific_queue:
tx.send_message(type_specific_queue, event_data)
# Add to analytics queue
tx.send_message(analytics_queue, {
"event_type": event_type,
"event_data": event_data,
"source": "event_service"
})
Event Sourcing
Use transactions to implement event sourcing patterns:
def update_user_profile(user_id, updates):
"""Update user profile and publish events atomically."""
with mq.transaction() as tx:
# Create the event
event = {
"user_id": user_id,
"changes": updates,
"timestamp": time.time()
}
# Publish to user events stream
tx.send_message(user_events_queue, {
"type": "user_profile_updated",
"data": event
})
# Publish to each interested service's queue
if "email" in updates:
tx.send_message(email_service_queue, {
"action": "update_email",
"user_id": user_id,
"new_email": updates["email"]
})
if "preferences" in updates:
tx.send_message(preferences_service_queue, {
"action": "update_preferences",
"user_id": user_id,
"preferences": updates["preferences"]
})
Workflow Steps
Use transactions to implement workflow steps:
def complete_order_step(order_id, step, result):
"""Complete a step in an order processing workflow."""
with mq.transaction() as tx:
# Log the step completion
tx.send_message(workflow_log_queue, {
"order_id": order_id,
"step": step,
"result": result,
"timestamp": time.time()
})
# Determine the next step
if step == "payment_processing":
if result["status"] == "success":
# Trigger fulfillment
tx.send_message(fulfillment_queue, {
"order_id": order_id,
"action": "start_fulfillment",
"payment_info": result["payment_info"]
})
else:
# Payment failed, trigger notification
tx.send_message(notification_queue, {
"type": "payment_failed",
"order_id": order_id,
"reason": result["reason"]
})
Transaction Limitations
It's important to understand the limitations of transactions in LeanMQ:
Redis-Only Operations: Transactions only encompass Redis operations. If your transaction needs to involve other systems (databases, APIs, etc.), you'll need to implement your own two-phase commit pattern.
No Read Operations: Redis transactions don't support reading values during the transaction and making decisions based on those values.
No Rollback for Queue Creation: Creating queues is not part of the transaction and cannot be rolled back.
Network Failures: If a network failure occurs after operations are queued but before the transaction is executed, the transaction will fail without being executed.
Limited Error Reporting: If a transaction fails, you get a generic TransactionError without detailed information about which specific operation failed.
Best Practices
Keep Transactions Focused
Each transaction should encompass a single logical operation. Avoid mixing unrelated operations in the same transaction.
Pre-Create Queues
Create all necessary queues before starting your transactions to avoid partial failures:
# Good practice - create queues first
queue1, _ = mq.create_queue_pair("queue1")
queue2, _ = mq.create_queue_pair("queue2")
with mq.transaction() as tx:
tx.send_message(queue1, {"key": "value1"})
tx.send_message(queue2, {"key": "value2"})
Implement Retry Logic
For critical operations, implement proper retry logic:
def send_with_retry(max_retries=3, backoff_factor=2):
"""Send messages with retry logic."""
retries = 0
last_error = None
while retries < max_retries:
try:
with mq.transaction() as tx:
tx.send_message(queue1, {"key": "value1"})
tx.send_message(queue2, {"key": "value2"})
# Success, exit the loop
return True
except TransactionError as e:
retries += 1
last_error = e
wait_time = backoff_factor ** retries
print(f"Transaction failed (attempt {retries}/{max_retries}), "
f"retrying in {wait_time} seconds: {e}")
time.sleep(wait_time)
# All retries failed
print(f"All {max_retries} transaction attempts failed: {last_error}")
return False
Design for Idempotency
Make sure your message handlers are idempotent (can be safely repeated):
def handle_order_payment(data):
"""Handle order payment message (idempotent implementation)."""
order_id = data["order_id"]
payment_id = data["payment_id"]
# Check if this payment was already processed
if db.payment_exists(payment_id):
print(f"Payment {payment_id} already processed, ignoring duplicate")
return
# Process the payment
result = process_payment(order_id, data["amount"])
# Record the payment to prevent duplicates
db.record_payment(payment_id, order_id, result)
Related
- Core API: Documentation for the LeanMQ class that creates transactions
- Queue API: Documentation for the Queue class used in transactions
- Transactions Guide: Detailed guide to using transactions effectively