Back to Blog
tutorialgetting-startedpythonworkflow-orchestrationdistributed-systems

Getting Started with PyWorkflow: Build Your First Distributed Workflow

Learn how to build your first distributed workflow with PyWorkflow. This comprehensive guide covers installation, core concepts, practical examples, and best practices for workflow orchestration.

PyWorkflow Team18 min read

Getting Started with PyWorkflow

What is Workflow Orchestration and Why Does It Matter?

Workflow orchestration is the practice of coordinating multiple automated tasks across business applications and services to help ensure seamless execution. Unlike simple workflow automation, which focuses on automating individual tasks, workflow orchestration creates a connected framework where these automated tasks interact efficiently, follow a logical sequence, and integrate with other systems to achieve complete end-to-end business processes.

The Problem Traditional Approaches Can't Solve

Why Manual Workflow Management Fails

Building reliable, long-running distributed processes is one of the hardest problems in modern software engineering. Traditional approaches require you to manually handle:

  • Server restarts mid-process — Your workflow stops when infrastructure fails, and you must rebuild state from scratch
  • Retry logic without duplicating work — Implementing retries safely across distributed systems is notoriously difficult; get it wrong and you process the same request twice
  • Long pauses without holding connections — Sleeping for hours or days while holding database connections and memory is wasteful and doesn't scale
  • Complete auditability and tracking — Understanding what happened at each step requires building custom logging infrastructure
  • Scaling across machines — Distributing work across multiple servers introduces coordination complexity that's easy to get wrong

These challenges force teams to either build custom infrastructure (expensive and error-prone) or use legacy workflow systems (rigid and hard to extend).

How Workflow Orchestration Solves These Problems

Workflow orchestration platforms like PyWorkflow abstract away these complexities. They provide:

  • Automatic failure recovery — Your workflows resume automatically from the exact point of failure
  • Built-in retry mechanisms — Configurable retry strategies with exponential backoff prevent duplicate processing
  • Zero-resource suspension — Sleep without holding connections or memory; the orchestrator persists your state
  • Complete event sourcing — Every state change is recorded as an immutable event, providing full auditability
  • Horizontal scaling — Distribute work across workers automatically; the framework handles coordination

The Evolution of Workflow Orchestration

Historical Context and Why It Matters Today

Workflow orchestration isn't new—enterprises have used it for decades in systems like IBM Workflow or SAP Process Orchestration. However, these legacy systems were:

  • Expensive — Required significant licensing fees and consulting costs
  • Inflexible — Built on proprietary languages and workflows, hard to extend
  • Slow to develop — GUI-based design tools made iteration tedious
  • Tightly coupled — Difficult to integrate with modern cloud services and microservices

The emergence of open-source orchestration frameworks (Airflow, Prefect, Temporal) and cloud-native approaches has democratized workflow orchestration. Modern tools let you define workflows in code, integrate seamlessly with cloud services, and scale from small scripts to enterprise-grade systems.

What is PyWorkflow and How Does It Work?

PyWorkflow's Core Architecture

PyWorkflow is a distributed workflow orchestration framework built specifically for Python developers. It enables you to build complex, long-running business processes as simple, readable Python code while the framework handles all the infrastructure complexity.

Key Architectural Components

The Workflow Definition Layer — You write workflows as Python async functions, using familiar language features. PyWorkflow parses your code to understand task dependencies and execution order.

The Execution Engine — The engine manages workflow instances, persisting state after each step. When a worker crashes, the engine resumes from the last successful checkpoint.

The Message Broker — Tasks are queued and distributed to available workers via Redis or RabbitMQ. The broker ensures tasks are processed exactly once, even if workers fail.

The State Store — Event sourcing captures every state transition as immutable events. This provides complete auditability and enables replay and recovery.

The Monitoring Layer — Real-time dashboards and APIs let you track workflow progress, identify bottlenecks, and debug issues.

Why PyWorkflow Stands Out

Comparison with Alternative Approaches

Feature PyWorkflow Apache Airflow Celery Temporal Manual Implementation
Language Python-native Python DAGs Python Multi-language Any
Fault Tolerance Built-in, automatic Manual handling Manual Built-in Not included
State Management Event sourcing Database state In-memory Durable execution Custom code
Retry Logic Configurable, automatic Manual Manual Automatic Manual
Learning Curve Gentle Moderate Steep Moderate Steep
Scaling Horizontal, automatic Horizontal, manual Horizontal, manual Horizontal, automatic Not scalable
Zero-Resource Sleep Yes No No Yes No
Observability Built-in Good Limited Excellent None

Why Choose PyWorkflow for Your Next Project

PyWorkflow is optimized for a specific use case: Python developers building distributed workflows that need automatic fault tolerance and event sourcing without the operational overhead of systems like Airflow.

Best for:

  • Microservices that coordinate across systems
  • Order processing and fulfillment pipelines
  • Multi-step user onboarding flows
  • Long-running background jobs with checkpoints
  • Event-driven automation

Not ideal for:

  • Simple scheduled cron jobs (use scheduling libraries)
  • Massive ETL pipelines (use Airflow or Spark)
  • Real-time streaming (use Kafka or Flink)

Installation and Initial Setup

System Requirements and Prerequisites

Before installing PyWorkflow, ensure you have:

  • Python 3.8+ — PyWorkflow requires modern Python async support
  • Redis 6.0+ or RabbitMQ 3.8+ — For the message broker (Redis is recommended for development)
  • pip — Python package manager (included with Python 3.4+)
  • Git (optional) — For cloning example repositories

Step-by-Step Installation

Installing PyWorkflow

The simplest installation method is via pip:

pip install pyworkflow

For development with the latest features:

git clone https://github.com/yasha-dev1/pyworkflow.git
cd pyworkflow
pip install -e .

Setting Up Your Message Broker

Option 1: Redis (Recommended for Development)

Install Redis:

# macOS
brew install redis

# Ubuntu/Debian
sudo apt-get install redis-server

# Docker
docker run -d -p 6379:6379 redis:latest

Start Redis:

redis-server

Option 2: RabbitMQ (Recommended for Production)

Install RabbitMQ:

# macOS
brew install rabbitmq

# Ubuntu/Debian
sudo apt-get install rabbitmq-server

# Docker
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Start RabbitMQ:

rabbitmq-server

Verifying Your Installation

Create a simple test file to verify everything works:

# test_setup.py
from pyworkflow import workflow, step

@step()
async def test_step():
    return "PyWorkflow is installed correctly!"

@workflow()
async def test_workflow():
    result = await test_step()
    return result

if __name__ == "__main__":
    from pyworkflow import start
    run_id = start(test_workflow)
    print(f"Test workflow started: {run_id}")

Run it:

python test_setup.py

If you see the workflow ID printed, your installation is successful!

Core Concepts: Understanding PyWorkflow's Building Blocks

Steps: The Atomic Unit of Work

What Are Steps and Why They Matter

A step is the smallest unit of work in a PyWorkflow workflow. It's a single async function that performs one logical operation and can be retried independently.

Key characteristics:

  • Atomic — Either fully succeeds or fully fails; no partial completion
  • Idempotent — Safe to retry multiple times with the same inputs and get the same result
  • Traceable — Every execution is logged with timing and results
  • Resilient — Automatically retried on transient failures

Defining Your First Step

from pyworkflow import step
import logging

logger = logging.getLogger(__name__)

@step()
async def send_welcome_email(user_id: str, user_email: str):
    """
    Send a welcome email to a new user.
    
    This step demonstrates:
    - Type hints for clarity
    - Logging for observability
    - Idempotent design (safe to retry)
    """
    logger.info(f"Sending welcome email to {user_email}")
    
    # In a real system, this would call your email service
    email_sent = await send_email(
        to=user_email,
        subject="Welcome to Our Service!",
        template="welcome"
    )
    
    logger.info(f"Welcome email sent to {user_email}")
    return {"status": "sent", "email": user_email}

Configuring Step Behavior

Steps support configuration for error handling and resource management:

@step(
    max_retries=3,              # Retry up to 3 times on failure
    retry_delay=5,              # Wait 5 seconds before first retry
    retry_backoff=2,            # Double the delay each retry (5s, 10s, 20s)
    timeout=30,                 # Fail if step takes >30 seconds
    name="send_email_step"      # Custom name for monitoring
)
async def send_email_with_retries(email: str, subject: str):
    """Send email with automatic retries and timeout protection."""
    return await email_service.send(email, subject)

Understanding Step Idempotency

Idempotency is crucial for distributed systems. A step is idempotent if calling it multiple times with the same inputs produces the same result and has the same side effects as calling it once.

# ✓ IDEMPOTENT - Safe to retry
@step()
async def create_user_record(user_id: str, email: str):
    """Create or return existing user record."""
    user = await db.find_user(user_id)
    if user:
        return user  # Already exists, return it
    return await db.create_user(user_id, email)

# ✗ NOT IDEMPOTENT - Unsafe to retry
@step()
async def increment_counter(counter_id: str):
    """Increment a counter - unsafe to retry!"""
    current = await db.get(counter_id)
    await db.set(counter_id, current + 1)  # Retrying doubles the increment
    return current + 1

# ✓ IDEMPOTENT - Better version
@step()
async def increment_counter_safely(counter_id: str):
    """Increment counter with idempotency check."""
    current = await db.get(counter_id)
    new_value = current + 1
    # Use conditional update to ensure we only increment once
    success = await db.compare_and_set(counter_id, current, new_value)
    return new_value

Workflows: Orchestrating Steps into Processes

What Are Workflows

A workflow is an async function decorated with @workflow() that orchestrates multiple steps into a cohesive process. Workflows define the execution order, data flow, and control logic for your business process.

Key characteristics:

  • Composable — Workflows can call other workflows
  • Durable — State is persisted after each step
  • Observable — Complete execution history is recorded
  • Recoverable — Failed workflows resume from the last successful step

Your First Workflow: User Onboarding

from pyworkflow import workflow, step, sleep

@step()
async def create_user_account(email: str, name: str):
    """Create a new user account in the system."""
    user = await db.users.create(email=email, name=name)
    return {"user_id": user.id, "email": email}

@step()
async def send_welcome_email(user_id: str, email: str):
    """Send welcome email to the new user."""
    await email_service.send(
        to=email,
        template="welcome",
        context={"user_id": user_id}
    )
    return {"status": "sent"}

@step()
async def schedule_onboarding_call(user_id: str):
    """Schedule an onboarding call with the user."""
    call = await calendar.schedule_call(
        user_id=user_id,
        duration_minutes=30,
        template="onboarding"
    )
    return {"call_id": call.id}

@step()
async def send_tips_email(user_id: str, email: str):
    """Send helpful tips email after user has had time to explore."""
    await email_service.send(
        to=email,
        template="tips",
        context={"user_id": user_id}
    )
    return {"status": "sent"}

@workflow()
async def user_onboarding_workflow(email: str, name: str):
    """
    Complete user onboarding workflow.
    
    Steps:
    1. Create user account
    2. Send welcome email
    3. Schedule onboarding call
    4. Wait 3 days
    5. Send helpful tips email
    """
    # Step 1: Create the account
    user_result = await create_user_account(email, name)
    user_id = user_result["user_id"]
    
    # Step 2: Send welcome email immediately
    await send_welcome_email(user_id, email)
    
    # Step 3: Schedule onboarding call
    call_result = await schedule_onboarding_call(user_id)
    
    # Step 4: Wait 3 days - zero resources consumed!
    # The workflow is suspended and resumed automatically after 3 days
    await sleep("3d")
    
    # Step 5: Send tips email after user has explored
    await send_tips_email(user_id, email)
    
    return {
        "status": "onboarding_complete",
        "user_id": user_id,
        "call_id": call_result["call_id"]
    }

# Start a workflow instance
if __name__ == "__main__":
    from pyworkflow import start
    
    run_id = start(
        user_onboarding_workflow,
        email="alice@example.com",
        name="Alice Smith"
    )
    print(f"Onboarding workflow started: {run_id}")

Understanding Workflow State and Durability

When you call a step from within a workflow, PyWorkflow:

  1. Serializes the step call — Converts function arguments to JSON for storage
  2. Executes the step — Runs the step function, potentially on a different worker
  3. Persists the result — Stores the step result as an immutable event
  4. Resumes the workflow — Continues with the next step using the stored result

This process ensures that if any worker crashes, the workflow can resume from exactly where it left off.

@workflow()
async def resilient_workflow():
    # Step 1 executes
    result1 = await step1()  # Stored: step1 returned result1
    
    # If the worker crashes here, the next worker will:
    # 1. Load the workflow state
    # 2. See that step1 already completed
    # 3. Skip step1 and use the stored result1
    # 4. Continue with step2
    
    result2 = await step2(result1)  # Stored: step2 returned result2
    
    return {"final": result2}

Event Sourcing: The Foundation of Durability

What is Event Sourcing and Why It's Powerful

Event sourcing is an architectural pattern where every change to application state is captured as an immutable event. Instead of storing only the current state, you store the complete history of events that led to that state.

Benefits:

  • Complete auditability — You can see exactly what happened and when
  • Replay capability — Replay events to recover from failures or investigate issues
  • Temporal queries — Ask "what was the state at time T?"
  • Debugging — Understand the exact sequence of operations that led to a failure

How PyWorkflow Uses Event Sourcing

# When you run a workflow, PyWorkflow stores events like:
[
    {
        "type": "workflow_started",
        "workflow_id": "onboarding_abc123",
        "timestamp": "2024-04-20T10:00:00Z",
        "input": {"email": "alice@example.com", "name": "Alice"}
    },
    {
        "type": "step_started",
        "step_name": "create_user_account",
        "timestamp": "2024-04-20T10:00:01Z"
    },
    {
        "type": "step_completed",
        "step_name": "create_user_account",
        "timestamp": "2024-04-20T10:00:02Z",
        "result": {"user_id": "user_123", "email": "alice@example.com"}
    },
    {
        "type": "step_started",
        "step_name": "send_welcome_email",
        "timestamp": "2024-04-20T10:00:03Z"
    },
    {
        "type": "step_completed",
        "step_name": "send_welcome_email",
        "timestamp": "2024-04-20T10:00:05Z",
        "result": {"status": "sent"}
    },
    {
        "type": "workflow_sleeping",
        "duration": "3d",
        "timestamp": "2024-04-20T10:00:06Z"
    }
    // ... more events ...
]

Querying Workflow History

from pyworkflow import get_workflow_history

# Get complete event history
history = get_workflow_history("onboarding_abc123")

# See all steps and their results
for event in history:
    if event["type"] == "step_completed":
        print(f"Step {event['step_name']} completed: {event['result']}")

# Calculate total time
start_time = history[0]["timestamp"]
end_time = history[-1]["timestamp"]
total_duration = end_time - start_time
print(f"Total workflow duration: {total_duration}")

Building Your First Real Workflow: Order Processing

See the full example in the original blog post - this is a comprehensive 200+ line example demonstrating all core concepts.

Best Practices for Production Workflows

Design Principles

1. Keep Steps Focused and Composable

Each step should do one thing well. This makes them:

  • Easier to test
  • Simpler to retry
  • More reusable across workflows
# ✓ GOOD - Focused steps
@step()
async def validate_email(email: str):
    # Only validates email format
    pass

@step()
async def check_email_not_registered(email: str):
    # Only checks if email is already registered
    pass

# ✗ AVOID - Too many responsibilities
@step()
async def validate_and_register_user(email: str, name: str):
    # Validates, checks registration, AND registers
    # Hard to retry, hard to test
    pass

2. Use Meaningful Names for Observability

Your step and workflow names appear in logs and dashboards. Make them descriptive.

3. Make Steps Idempotent

Always design steps to be safe for retry.

4. Handle Errors Gracefully

Plan for failures and handle them explicitly.

5. Log Important Events

Logging helps with debugging and monitoring.

Scaling and Monitoring

Horizontal Scaling

As your workload grows, simply add more workers. All workers automatically distribute tasks, and PyWorkflow handles coordination.

Monitoring Workflow Performance

Track metrics like total runs, success rate, average duration, and failure counts.

Common Use Cases and Patterns

  • Multi-Step User Onboarding
  • Order Processing and Fulfillment
  • Data Pipeline Orchestration
  • Event-Driven Automation

Next Steps and Further Learning

Explore the PyWorkflow Documentation, join the community, and build your first workflow today.

Conclusion

PyWorkflow makes building reliable, distributed workflows simple. Start with a simple workflow in your domain and gradually add complexity as you become comfortable with the patterns.

The workflows you build today will be the foundation of scalable, reliable systems tomorrow. Happy workflow building! 🚀

Get the latest PyWorkflow articles

Subscribe to our newsletter for tutorials, best practices, and updates on distributed workflow orchestration.