Advanced Workflow Patterns in PyWorkflow
Master advanced patterns like parallel execution, conditional branching, dynamic task generation, and circuit breakers in PyWorkflow for building sophisticated distributed systems.
Advanced Workflow Patterns in PyWorkflow
Introduction: Moving Beyond Basic Workflows
Once you've mastered the basics of PyWorkflow, it's time to explore advanced patterns that unlock the full power of distributed workflow orchestration. These patterns solve real-world problems: coordinating work across multiple systems, handling conditional logic, generating tasks dynamically, and building resilient systems that gracefully degrade under failure.
This guide covers the patterns that separate simple workflows from sophisticated, production-grade systems. Each pattern is grounded in real-world use cases and includes detailed code examples you can adapt to your needs.
What Are Workflow Patterns and Why They Matter?
Understanding Pattern-Based Architecture
A workflow pattern is a reusable solution to a common problem in distributed systems. Rather than writing custom code for each scenario, patterns provide proven architectures that:
- Reduce complexity — Patterns handle edge cases you might not anticipate
- Improve maintainability — Team members recognize familiar patterns
- Enable scaling — Patterns are designed to scale from small to large workloads
- Ensure reliability — Patterns encode lessons learned from production failures
Pattern Categories in PyWorkflow
| Pattern Category | Purpose | Complexity | Use When |
|---|---|---|---|
| Parallel Execution | Run independent tasks simultaneously | Medium | Multiple operations can happen at once |
| Conditional Branching | Choose different paths based on data | Low | Workflow behavior depends on input/state |
| Dynamic Task Generation | Create tasks based on runtime data | High | Number/type of tasks unknown upfront |
| Error Handling | Recover from failures gracefully | Medium | Failures are expected and recoverable |
| Circuit Breaking | Prevent cascading failures | High | Calling external services that might fail |
| Saga Pattern | Distributed transactions | High | Multi-step changes across systems |
| Workflow Composition | Combine multiple workflows | Medium | Workflows share common sub-processes |
Parallel Execution: Coordinating Independent Operations
Why Parallel Execution Matters
In distributed systems, parallelism is essential for performance. If tasks are independent, running them sequentially wastes resources and time. Parallel execution allows you to:
- Reduce total workflow time — Independent tasks run concurrently instead of one-by-one
- Utilize resources efficiently — Workers can process multiple tasks across the cluster
- Scale throughput — More workers automatically increase parallelism
Understanding Parallelism in PyWorkflow
PyWorkflow automatically parallelizes independent steps. When you call multiple steps without depending on each other's results, they execute in parallel:
@workflow()
async def parallel_workflow():
# These three steps run in parallel
# PyWorkflow distributes them to available workers
result1 = await step1() # Worker A
result2 = await step2() # Worker B
result3 = await step3() # Worker C
# Execution waits here for all three to complete
# Then continues with the results
return await aggregate_results(result1, result2, result3)Pattern 1: Fan-Out / Fan-In
The fan-out / fan-in pattern distributes work across multiple parallel tasks (fan-out), then combines the results (fan-in).
Use Cases
- Data processing — Process chunks of data in parallel
- Batch operations — Process multiple items simultaneously
- Aggregation — Combine results from multiple sources
- Report generation — Gather data from multiple systems
Implementation
from pyworkflow import workflow, step
from typing import List, Dict
@step()
async def process_chunk(chunk_id: int, data: list) -> Dict:
"""
Process a single chunk of data.
In a real system, this might:
- Apply ML models
- Transform data
- Compute statistics
"""
print(f"Processing chunk {chunk_id} with {len(data)} items")
# Simulate processing
result = {
"chunk_id": chunk_id,
"count": len(data),
"sum": sum(data),
"avg": sum(data) / len(data) if data else 0
}
return result
@step()
async def aggregate_results(results: List[Dict]) -> Dict:
"""
Combine results from all chunks.
This step receives the results from all parallel tasks
and combines them into a final result.
"""
total_count = sum(r["count"] for r in results)
total_sum = sum(r["sum"] for r in results)
return {
"chunks_processed": len(results),
"total_items": total_count,
"total_sum": total_sum,
"overall_average": total_sum / total_count if total_count > 0 else 0,
"chunk_results": results
}
@workflow()
async def parallel_data_processing(data: list, chunk_size: int = 100):
"""
Process large dataset in parallel chunks.
This workflow:
1. Splits data into chunks
2. Processes each chunk in parallel
3. Aggregates the results
Performance: O(n/workers) instead of O(n)
"""
# Fan-out: Create parallel tasks for each chunk
chunks = [
data[i:i + chunk_size]
for i in range(0, len(data), chunk_size)
]
# Execute all chunks in parallel
chunk_results = []
for chunk_id, chunk in enumerate(chunks):
result = await process_chunk(chunk_id, chunk)
chunk_results.append(result)
# Fan-in: Aggregate all results
final_result = await aggregate_results(chunk_results)
return final_resultPerformance Considerations
| Scenario | Sequential Time | Parallel Time | Speedup |
|---|---|---|---|
| 10 chunks, 1 second each | 10s | 1s | 10x |
| 100 chunks, 100ms each | 10s | 1s | 10x |
| 1000 chunks, 10ms each | 10s | 1s | 10x |
The speedup depends on the number of available workers. With N workers, you can achieve up to N-fold speedup.
Pattern 2: Parallel Branches with Convergence
Sometimes you need to execute different branches in parallel, then converge on a common result. See the full example in the original blog for detailed implementation.
Conditional Branching: Making Decisions in Workflows
Why Conditional Logic is Essential
Real-world workflows rarely follow a single path. They make decisions based on:
- Input data — Different processing for different inputs
- Intermediate results — Different paths based on step outcomes
- External state — Different actions based on system state
Pattern 3: Simple Conditional Branching
The simplest pattern uses Python's if statements to choose between paths:
@step()
async def validate_order(order: Dict) -> Dict:
"""Validate order and return validation result."""
errors = []
if not order.get("customer_id"):
errors.append("Missing customer_id")
if not order.get("items"):
errors.append("No items in order")
if order.get("total", 0) <= 0:
errors.append("Invalid total")
return {
"valid": len(errors) == 0,
"errors": errors
}
@step()
async def process_valid_order(order: Dict) -> Dict:
"""Process an order that passed validation."""
# Charge payment, reserve inventory, etc.
return {"status": "processed", "order_id": order["id"]}
@step()
async def handle_invalid_order(order: Dict, errors: List[str]) -> Dict:
"""Handle an order that failed validation."""
# Log error, notify customer, etc.
return {
"status": "rejected",
"order_id": order["id"],
"reason": "; ".join(errors)
}
@workflow()
async def order_workflow_with_branching(order: Dict) -> Dict:
"""
Process order with conditional branching.
Flow:
1. Validate order
2. If valid: process order
3. If invalid: handle error
"""
validation = await validate_order(order)
if validation["valid"]:
result = await process_valid_order(order)
else:
result = await handle_invalid_order(order, validation["errors"])
return resultPattern 4: Multi-Way Branching
For more complex decisions, use multiple branches based on classification. See the full blog for detailed implementation.
Dynamic Task Generation: Creating Tasks at Runtime
Why Dynamic Generation is Powerful
Sometimes you don't know how many tasks to create until runtime. Dynamic generation allows:
- Flexible workloads — Handle variable-sized jobs
- Scalability — Process any number of items
- Responsiveness — Generate tasks based on real-time data
Pattern 5: Map Pattern (Generate and Execute)
The map pattern generates a task for each item in a collection. See the full blog for detailed implementation.
Pattern 6: Filter-Map-Reduce
Combine filtering, mapping, and reduction for complex data processing. See the full blog for detailed implementation.
Error Handling and Resilience Patterns
Pattern 7: Retry with Exponential Backoff
Automatic retries are essential for handling transient failures:
@step(
max_retries=5, # Retry up to 5 times
retry_delay=1, # Start with 1 second delay
retry_backoff=2, # Double the delay each time
max_retry_delay=60 # Cap at 60 seconds
)
async def call_external_api(endpoint: str, data: Dict) -> Dict:
"""
Call external API with automatic retries.
Retry delays: 1s, 2s, 4s, 8s, 16s
This handles transient failures:
- Network timeouts
- Temporary service unavailability
- Rate limiting (with backoff)
"""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(endpoint, json=data) as response:
if response.status >= 500:
# Transient error, will be retried
raise Exception(f"Server error: {response.status}")
return await response.json()
@workflow()
async def resilient_api_workflow(endpoint: str) -> Dict:
"""Workflow that calls external APIs with resilience."""
result = await call_external_api(endpoint, {"data": "test"})
return resultPattern 8: Circuit Breaker
Prevent cascading failures by stopping requests when a service is failing. See the full blog for detailed implementation.
Workflow Composition: Building Complex Systems
Pattern 9: Workflow Composition
Compose multiple workflows together for complex business logic. See the full blog for detailed implementation.
Performance Optimization Patterns
Pattern 10: Caching for Expensive Operations
Cache expensive computations to avoid redundant work. See the full blog for detailed implementation.
Monitoring and Debugging Advanced Workflows
Observability Best Practices
See the full blog for comprehensive observability patterns and examples.
Best Practices Summary
| Practice | Benefit | Example |
|---|---|---|
| Keep patterns simple | Easier to understand and debug | Use basic fan-out/fan-in before complex patterns |
| Measure performance | Identify bottlenecks | Log timing for each step |
| Test failure paths | Ensure resilience | Mock service failures in tests |
| Use meaningful names | Better observability | parallel_order_processing not workflow1 |
| Document assumptions | Prevent misuse | Comment on idempotency requirements |
| Monitor in production | Catch issues early | Track success rate and duration |
Conclusion
Advanced workflow patterns are the building blocks of sophisticated, production-grade distributed systems. By mastering these patterns, you can:
- Build scalable systems — Handle growing workloads
- Ensure reliability — Gracefully handle failures
- Maintain code quality — Use proven solutions
- Debug efficiently — Understand what's happening
Start with simple patterns and gradually adopt more complex ones as your needs grow. The PyWorkflow documentation and community examples provide additional patterns and real-world use cases.
Build with confidence. Your workflows will be reliable, observable, and ready for production. 🚀