Prefect for Data Pipelines
Prefect is a modern Python-native workflow orchestration framework. Where Airflow was designed around DAGs defined in config-like Python files, Prefect treats flows as normal Python functions. Any Python function can become a task or flow with a decorator — no boilerplate, no operator classes, no XCom. If you know Python, you already know 90% of Prefect.
Prefect vs Airflow
| Feature | Airflow | Prefect |
|---|---|---|
| Task definition | Operators (classes) | Decorated functions |
| Data passing | XCom (limited size) | Return values (native Python) |
| Local testing | Requires Airflow setup | python my_flow.py |
| Dynamic tasks | Complex (expand/map) | Natural Python loops |
| Error handling | Callback functions | try/except + retries |
| Scheduling | Built-in scheduler | Prefect server / Cloud |
| Setup complexity | High (DB, webserver, scheduler) | pip install prefect |
| Production maturity | Very mature (10+ years) | Mature (5+ years) |
| Best for | Large teams, complex DAGs | Small-medium teams, rapid dev |
Flows and Tasks
# basic_flow.py — Prefect fundamentals
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import requests
import json
from pathlib import Path
@task(
retries=3,
retry_delay_seconds=30,
log_prints=True,
)
def extract_data(api_url: str, page_size: int = 100) -> list[dict]:
"""Extract data from an API with automatic retries."""
logger = get_run_logger()
all_records = []
page = 1
while True:
response = requests.get(
api_url,
params={"page": page, "per_page": page_size},
timeout=30,
)
response.raise_for_status()
records = response.json().get("data", [])
if not records:
break
all_records.extend(records)
page += 1
logger.info(f"Page {page - 1}: {len(records)} records")
logger.info(f"Total extracted: {len(all_records)} records")
return all_records
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
)
def transform_data(raw_records: list[dict]) -> pd.DataFrame:
"""Transform raw data — cached for 1 hour on same input."""
logger = get_run_logger()
df = pd.DataFrame(raw_records)
# Clean
df["name"] = df["name"].str.strip().str.title()
df["price"] = pd.to_numeric(df["price"], errors="coerce")
df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")
# Filter
df = df.dropna(subset=["name", "price"])
df = df[df["price"] > 0]
df = df.drop_duplicates(subset=["id"])
logger.info(f"Transformed: {len(df)} clean records")
return df
@task
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
"""Validate data quality."""
logger = get_run_logger()
assert len(df) > 0, "No records after transformation"
assert df["price"].min() >= 0, "Negative prices found"
assert df["id"].is_unique, "Duplicate IDs found"
null_pct = df.isnull().mean()
high_null = null_pct[null_pct > 0.1]
if len(high_null) > 0:
logger.warning(f"High null columns: {high_null.to_dict()}")
logger.info("Validation passed")
return df
@task
def load_data(df: pd.DataFrame, output_path: str):
"""Load data to storage."""
logger = get_run_logger()
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(output_path, index=False)
logger.info(f"Saved {len(df)} records to {output_path}")
@flow(name="ETL Products Pipeline")
def etl_products(
api_url: str = "https://api.example.com/v1/products",
output_path: str = "./output/products.parquet",
):
"""End-to-end ETL pipeline."""
raw_data = extract_data(api_url)
clean_data = transform_data(raw_data)
validated_data = validate_data(clean_data)
load_data(validated_data, output_path)
# Run locally — just execute the Python file
if __name__ == "__main__":
etl_products()Advanced Task Features
Retries and Error Handling
# retries.py — Advanced retry configuration
from prefect import flow, task, get_run_logger
from prefect.tasks import exponential_backoff
import httpx
@task(
retries=5,
retry_delay_seconds=exponential_backoff(backoff_factor=10),
retry_jitter_factor=0.5,
)
def fetch_with_backoff(url: str) -> dict:
"""Fetch URL with exponential backoff on failure."""
response = httpx.get(url, timeout=30)
response.raise_for_status()
return response.json()
@task(retries=3, retry_delay_seconds=10)
def risky_transform(data: list[dict]) -> list[dict]:
"""Transform with retry — specific exception handling."""
logger = get_run_logger()
try:
# Primary processing
return [process_record(r) for r in data]
except MemoryError:
# Do not retry on memory errors — they won't fix themselves
raise
except ValueError as e:
logger.warning(f"Transform error (will retry): {e}")
raise # Prefect will retry
def process_record(record: dict) -> dict:
"""Process a single record."""
record["name"] = record["name"].strip().title()
record["price"] = float(record["price"])
return record
@flow
def resilient_pipeline():
"""Pipeline with comprehensive error handling."""
logger = get_run_logger()
try:
data = fetch_with_backoff("https://api.example.com/data")
processed = risky_transform(data)
logger.info(f"Processed {len(processed)} records")
except Exception as e:
logger.error(f"Pipeline failed: {e}")
# Could trigger alerting here
raiseTask Mapping (Dynamic Fan-Out)
# mapping.py — Process items in parallel with .map()
from prefect import flow, task, get_run_logger
import pandas as pd
@task
def get_table_list() -> list[str]:
"""Get list of tables to process."""
return ["users", "orders", "products", "events", "sessions"]
@task
def extract_table(table_name: str) -> dict:
"""Extract a single table."""
logger = get_run_logger()
# Simulate extraction
import time
time.sleep(1)
row_count = len(table_name) * 1000 # Fake
logger.info(f"Extracted {table_name}: {row_count} rows")
return {"table": table_name, "rows": row_count}
@task
def summarize(results: list[dict]):
"""Combine all results."""
logger = get_run_logger()
total = sum(r["rows"] for r in results)
logger.info(f"Total rows extracted: {total}")
for r in results:
logger.info(f" {r['table']}: {r['rows']}")
@flow
def parallel_extraction():
"""Extract multiple tables in parallel using .map()."""
tables = get_table_list()
# .map() runs extract_table for each table concurrently
results = extract_table.map(tables)
summarize(results)
if __name__ == "__main__":
parallel_extraction()Caching
# caching.py — Cache expensive computations
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import hashlib
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=24),
)
def expensive_computation(data_path: str) -> pd.DataFrame:
"""
This task is cached based on its input parameters.
If called again with the same data_path within 24 hours,
the cached result is returned immediately.
"""
df = pd.read_parquet(data_path)
# Expensive processing...
result = df.groupby("category").agg({
"price": ["mean", "std", "count"],
"rating": ["mean", "median"],
}).reset_index()
return result
# Custom cache key function
def custom_cache_key(context, parameters):
"""Cache based on file content hash, not just path."""
data_path = parameters["data_path"]
with open(data_path, "rb") as f:
content_hash = hashlib.md5(f.read()).hexdigest()
return f"{context.task.task_key}-{content_hash}"
@task(
cache_key_fn=custom_cache_key,
cache_expiration=timedelta(days=7),
)
def content_aware_task(data_path: str) -> pd.DataFrame:
"""Cache invalidates when file content changes, not just path."""
return pd.read_parquet(data_path)Deployments
# deploy.py — Deploy flows for scheduled execution
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from datetime import timedelta
@flow
def daily_etl(source: str = "production"):
"""Daily ETL flow that can be deployed."""
from prefect import get_run_logger
logger = get_run_logger()
logger.info(f"Running ETL for source: {source}")
# ... pipeline logic ...
# Create deployment programmatically
def create_deployment():
deployment = Deployment.build_from_flow(
flow=daily_etl,
name="daily-etl-production",
version="1.0",
schedule=CronSchedule(cron="0 6 * * *", timezone="UTC"),
parameters={"source": "production"},
tags=["etl", "daily", "production"],
description="Daily ETL pipeline for production data",
work_pool_name="default-agent-pool",
)
deployment.apply()
print(f"Deployment created: {deployment.name}")
# Or use prefect.yaml (preferred for version control)
PREFECT_YAML = """
deployments:
- name: daily-etl-production
entrypoint: flows/etl.py:daily_etl
schedule:
cron: "0 6 * * *"
timezone: UTC
parameters:
source: production
work_pool:
name: default-pool
tags:
- etl
- production
"""Subflows and Composition
# composition.py — Compose complex pipelines from smaller flows
from prefect import flow, task, get_run_logger
import pandas as pd
from pathlib import Path
@flow(name="Data Collection")
def collect_data(sources: list[str]) -> list[str]:
"""Sub-flow: collect data from multiple sources."""
logger = get_run_logger()
collected_files = []
for source in sources:
path = f"/data/raw/{source}.parquet"
# ... collection logic ...
collected_files.append(path)
logger.info(f"Collected: {source}")
return collected_files
@flow(name="Data Preprocessing")
def preprocess_data(file_paths: list[str]) -> str:
"""Sub-flow: clean and transform data."""
logger = get_run_logger()
dfs = [pd.read_parquet(p) for p in file_paths if Path(p).exists()]
combined = pd.concat(dfs, ignore_index=True)
# Clean
combined = combined.drop_duplicates()
combined = combined.dropna(subset=["id"])
output_path = "/data/clean/combined.parquet"
combined.to_parquet(output_path, index=False)
logger.info(f"Preprocessed: {len(combined)} rows")
return output_path
@flow(name="Data Validation")
def validate(clean_path: str) -> bool:
"""Sub-flow: validate data quality."""
df = pd.read_parquet(clean_path)
checks = [
len(df) > 0,
df["id"].is_unique,
df.isnull().mean().max() < 0.2,
]
return all(checks)
@flow(name="Main Pipeline")
def main_pipeline(
sources: list[str] = ["api", "database", "files"],
):
"""
Main pipeline composed of sub-flows.
Each sub-flow appears as a nested flow run in the UI.
"""
logger = get_run_logger()
# Sub-flows are called like regular functions
raw_files = collect_data(sources)
clean_path = preprocess_data(raw_files)
is_valid = validate(clean_path)
if is_valid:
logger.info("Pipeline complete — data is valid")
else:
logger.error("Pipeline complete — VALIDATION FAILED")
raise ValueError("Data validation failed")
if __name__ == "__main__":
main_pipeline()Notifications and Hooks
# notifications.py — Alert on flow completion/failure
from prefect import flow, task, get_run_logger
from prefect.blocks.notifications import SlackWebhook
import httpx
def send_slack_notification(message: str, webhook_url: str):
"""Send a notification to Slack."""
httpx.post(webhook_url, json={"text": message}, timeout=10)
@flow(
name="monitored-pipeline",
on_failure=[lambda flow, flow_run, state: send_slack_notification(
f"Pipeline FAILED: {flow.name}\nState: {state.name}",
"https://hooks.slack.com/services/XXX/YYY/ZZZ",
)],
on_completion=[lambda flow, flow_run, state: send_slack_notification(
f"Pipeline completed: {flow.name}\nState: {state.name}",
"https://hooks.slack.com/services/XXX/YYY/ZZZ",
)],
)
def monitored_pipeline():
"""Pipeline with automatic notifications."""
logger = get_run_logger()
logger.info("Processing...")
# ... pipeline logic ...
logger.info("Done!")
if __name__ == "__main__":
monitored_pipeline()Prefect Cloud Integration
# cloud_integration.py — Use Prefect Cloud features
from prefect import flow, task, get_run_logger
from prefect.blocks.system import Secret
from prefect.artifacts import create_markdown_artifact
import pandas as pd
@task
def extract_with_secret() -> list[dict]:
"""Use Prefect Secret block for credentials."""
api_key = Secret.load("external-api-key")
import requests
response = requests.get(
"https://api.example.com/data",
headers={"Authorization": f"Bearer {api_key.get()}"},
)
return response.json()
@task
def create_report(df: pd.DataFrame):
"""Create a Markdown artifact visible in Prefect Cloud UI."""
report = f"""
## Pipeline Report
- **Total Records**: {len(df):,}
- **Columns**: {', '.join(df.columns)}
- **Date Range**: {df['date'].min()} to {df['date'].max()}
- **Null Summary**:
| Column | Null % |
|--------|--------|
"""
for col in df.columns:
null_pct = df[col].isnull().mean() * 100
report += f"| {col} | {null_pct:.1f}% |\n"
create_markdown_artifact(
key="pipeline-report",
markdown=report,
description="Daily pipeline summary",
)
@flow
def cloud_pipeline():
data = extract_with_secret()
df = pd.DataFrame(data)
create_report(df)
if __name__ == "__main__":
cloud_pipeline()Quick Reference
| Concept | Code | Description |
|---|---|---|
| Task | @task | Unit of work, retryable, cacheable |
| Flow | @flow | Container for tasks, entry point |
| Retry | @task(retries=3) | Auto-retry on failure |
| Cache | @task(cache_key_fn=task_input_hash) | Skip if same input |
| Map | task.map(items) | Fan-out parallel execution |
| Subflow | Call @flow inside @flow | Nested flow composition |
| Deployment | prefect deploy | Schedule and trigger flows |
| Block | Secret.load("name") | Stored config/credentials |
| Artifact | create_markdown_artifact() | Rich output in UI |
| Command | Description |
|---|---|
prefect server start | Start local Prefect server |
prefect deploy | Deploy flows from prefect.yaml |
prefect flow-run create | Trigger a flow run |
prefect work-pool create | Create a work pool |
prefect worker start | Start a worker for a pool |
Key Takeaway
- Prefect treats flows as normal Python functions: any Python code can become a pipeline with a
@flowdecorator, with zero boilerplate. - Task caching with
task_input_hashskips re-computation when inputs have not changed, saving hours on expensive transformations. - Subflows enable composable pipeline architecture where each component is independently testable, deployable, and observable.
Exercise
Build a Cached, Retryable ETL Flow
Create a Prefect flow that:
- Extracts data from a mock API endpoint with 3 retries and exponential backoff.
- Transforms the data (cleaning, type casting) with input-based caching (1 hour expiry).
- Validates the output (row count, null checks, value ranges).
- Saves to Parquet and creates a Markdown artifact summarizing the run.
- Test it locally by running
python my_flow.py.
Solution Sketch
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.artifacts import create_markdown_artifact
from datetime import timedelta
import pandas as pd
@task(retries=3, retry_delay_seconds=[10, 30, 60])
def extract(url: str) -> list[dict]:
import requests
resp = requests.get(url, timeout=30)
resp.raise_for_status()
return resp.json()["data"]
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def transform(records: list[dict]) -> pd.DataFrame:
df = pd.DataFrame(records)
df["name"] = df["name"].str.strip().str.title()
df["price"] = pd.to_numeric(df["price"], errors="coerce")
return df.dropna(subset=["name", "price"])
@task
def validate(df: pd.DataFrame) -> pd.DataFrame:
assert len(df) > 0
assert df["price"].min() >= 0
return df
@task
def save(df: pd.DataFrame, path: str):
df.to_parquet(path, index=False)
create_markdown_artifact(key="etl-summary",
markdown=f"## ETL Summary\\n- Rows: {len(df)}\\n- Columns: {list(df.columns)}")
@flow(name="cached-etl")
def etl_pipeline(url: str = "https://api.example.com/data"):
raw = extract(url)
clean = transform(raw)
valid = validate(clean)
save(valid, "./output.parquet")
if __name__ == "__main__":
etl_pipeline()Debugging Scenario
Your Prefect flow works locally but fails when deployed. The error says "No module named 'pandas'" even though pandas is installed on your machine.
Diagnose and fix it.
Answer
The deployment runs on a different worker environment (different machine, Docker container, or virtual environment) that does not have the same packages installed. Prefect separates the flow definition from the execution environment.
Fixes:
- Pin dependencies: create a
requirements.txtorpyproject.tomland configure the deployment to install them. - Docker-based work pool: build a Docker image with all dependencies and configure the work pool to use it.
- Prefect Cloud infrastructure blocks: define an infrastructure block that specifies the Python environment, packages, and runtime configuration.
- Use
prefect.yamlpip requirements:yamldeployments: - name: my-flow entrypoint: flow.py:etl_pipeline work_pool: name: default pull: - prefect.deployments.steps.pip_install_requirements: requirements_file: requirements.txt
Common Misconceptions
- "Prefect is just a simpler Airflow." Prefect has a fundamentally different architecture: it is code-first (not config-first), supports local execution without infrastructure, and passes data natively through Python return values.
- "Task caching means I never need to worry about reprocessing." Caching is based on input hash and has an expiration. If the underlying data changes but the function inputs do not, the cache serves stale results. Use content-aware cache keys.
- "Subflows are just function calls." Subflows create separate flow runs in the Prefect UI with their own logs, state tracking, and retry behavior. They are more than function calls -- they are observable pipeline components.
- "Prefect replaces Airflow in all cases." For large organizations with complex scheduling, extensive operator ecosystems, and mature Airflow infrastructure, migration may not be worth it. Prefect excels for new projects and smaller teams.
Quiz
1. How does Prefect's data passing differ from Airflow's XCom?
Prefect tasks pass data through native Python return values and function arguments. There is no size limit or serialization overhead like Airflow's XCom, which stores values in a metadata database.
2. What does task_input_hash do as a cache key function?
It hashes the task's input parameters to create a cache key. If the same task is called with the same inputs within the cache expiration period, the cached result is returned without re-executing the task.
3. How does .map() work in Prefect?
.map()takes a list and runs the task concurrently for each element, similar to Python'smultiprocessing.Pool.map(). It creates one task run per input item, all running in parallel.
4. What is a Prefect Block?
A Block is a reusable, named configuration object stored in Prefect (e.g., database credentials, cloud storage configuration, webhook URLs). Blocks provide a secure way to manage secrets and infrastructure configuration.
5. Can Prefect flows be tested without a Prefect server?
Yes. Prefect flows are normal Python functions that can be called directly:
python my_flow.pyruns the flow locally without any server, scheduler, or infrastructure setup.
One-Liner Summary: Prefect turns any Python function into an observable, retryable, cacheable pipeline with a
@flowdecorator -- no infrastructure required for development, production-ready when deployed.