Great Expectations Deep Dive
Great Expectations (GX) is the most widely adopted open-source framework for data validation. It lets you define "expectations" about your data — rules like "this column should never be null" or "values in this column should be between 0 and 100" — and automatically validates every batch of data against those rules. When expectations fail, GX generates rich HTML reports showing exactly what went wrong.
Core Concepts
| Concept | Description |
|---|---|
| Data Source | Connection to your data (file, database, cloud) |
| Data Asset | A table, file, or query within a data source |
| Batch | A specific slice of data to validate |
| Expectation | A single validation rule |
| Expectation Suite | A named collection of expectations |
| Checkpoint | Bundles data + suite + actions (run, report, alert) |
| Data Docs | Auto-generated HTML documentation of validation results |
Setup and Configuration
# gx_setup.py — Initialize Great Expectations project
import great_expectations as gx
from great_expectations.data_context import FileDataContext
from pathlib import Path
def initialize_gx_project(project_dir: str = ".") -> FileDataContext:
"""Initialize a Great Expectations project."""
context = gx.get_context(
project_root_dir=project_dir,
cloud_mode=False,
)
return context
def add_pandas_datasource(
context: FileDataContext,
name: str = "local_data",
base_dir: str = "./data",
):
"""Add a pandas filesystem datasource."""
datasource = context.sources.add_pandas_filesystem(
name=name,
base_directory=base_dir,
)
# Add CSV data asset
datasource.add_csv_asset(
name="csv_files",
batching_regex=r"(?P<year>\d{4})/(?P<month>\d{2})/(?P<filename>.+)\.csv",
)
# Add Parquet data asset
datasource.add_parquet_asset(
name="parquet_files",
batching_regex=r"(?P<year>\d{4})/(?P<month>\d{2})/(?P<filename>.+)\.parquet",
)
return datasource
def add_sql_datasource(
context: FileDataContext,
name: str = "warehouse",
connection_string: str = "postgresql://user:pass@host/db",
):
"""Add a SQL datasource."""
datasource = context.sources.add_sql(
name=name,
connection_string=connection_string,
)
# Add table assets
datasource.add_table_asset(name="orders", table_name="orders")
datasource.add_table_asset(name="products", table_name="products")
# Add query asset (for complex queries)
datasource.add_query_asset(
name="active_orders",
query="SELECT * FROM orders WHERE status = 'active'",
)
return datasource
# Initialize
context = initialize_gx_project("./my_pipeline")
add_pandas_datasource(context, base_dir="./data")Writing Expectations
# expectations.py — Define validation rules
import great_expectations as gx
def create_product_expectations(context):
"""Create an expectation suite for product data."""
suite = context.add_expectation_suite(
expectation_suite_name="products_quality"
)
# Get a validator (connects suite to data for interactive development)
validator = context.get_validator(
datasource_name="local_data",
data_asset_name="parquet_files",
expectation_suite_name="products_quality",
batch_request={
"options": {"year": "2024", "month": "01"}
},
)
# --- Column existence ---
validator.expect_table_columns_to_match_set(
column_set=["id", "name", "price", "category", "created_at"],
exact_match=False, # Allow extra columns
)
# --- Not null ---
validator.expect_column_values_to_not_be_null("id")
validator.expect_column_values_to_not_be_null("name")
validator.expect_column_values_to_not_be_null("price")
# --- Uniqueness ---
validator.expect_column_values_to_be_unique("id")
# --- Type checks ---
validator.expect_column_values_to_be_of_type("id", "int64")
validator.expect_column_values_to_be_of_type("price", "float64")
# --- Value ranges ---
validator.expect_column_values_to_be_between(
"price", min_value=0, max_value=100000
)
# --- String patterns ---
validator.expect_column_value_lengths_to_be_between(
"name", min_value=1, max_value=500
)
# --- Categorical values ---
validator.expect_column_values_to_be_in_set(
"category",
value_set=["electronics", "clothing", "food", "books", "home", "sports"],
mostly=0.95, # Allow 5% unknown categories
)
# --- Row count ---
validator.expect_table_row_count_to_be_between(
min_value=100,
max_value=10_000_000,
)
# --- Null rate ---
validator.expect_column_values_to_not_be_null(
"category",
mostly=0.9, # At least 90% non-null
)
# --- Distribution ---
validator.expect_column_mean_to_be_between(
"price", min_value=5.0, max_value=500.0
)
validator.expect_column_median_to_be_between(
"price", min_value=1.0, max_value=200.0
)
validator.expect_column_stdev_to_be_between(
"price", min_value=1.0, max_value=1000.0
)
# --- Date ranges ---
validator.expect_column_values_to_be_between(
"created_at",
min_value="2020-01-01",
max_value="2030-01-01",
parse_strings_as_datetimes=True,
)
# --- Regex patterns ---
validator.expect_column_values_to_match_regex(
"email",
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
mostly=0.95,
)
# Save the suite
validator.save_expectation_suite(discard_failed_expectations=False)
return suiteCheckpoints
# checkpoints.py — Configure and run validation checkpoints
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
def create_product_checkpoint(context):
"""
Create a checkpoint that validates products and generates reports.
A checkpoint bundles:
- Which data to validate
- Which expectation suite to use
- What actions to take (report, alert, etc.)
"""
checkpoint = Checkpoint(
name="products_daily_check",
data_context=context,
config_version=1.0,
run_name_template="products_%Y%m%d_%H%M%S",
validations=[
{
"batch_request": {
"datasource_name": "local_data",
"data_asset_name": "parquet_files",
"options": {},
},
"expectation_suite_name": "products_quality",
}
],
action_list=[
# Store validation result
{
"name": "store_validation_result",
"action": {
"class_name": "StoreValidationResultAction",
},
},
# Update Data Docs (HTML reports)
{
"name": "update_data_docs",
"action": {
"class_name": "UpdateDataDocsAction",
},
},
# Send Slack notification on failure
{
"name": "send_slack_notification",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "${SLACK_WEBHOOK_URL}",
"notify_on": "failure",
"renderer": {
"module_name": "great_expectations.render.renderer.slack_renderer",
"class_name": "SlackRenderer",
},
},
},
],
)
context.add_or_update_checkpoint(checkpoint=checkpoint)
return checkpoint
def run_checkpoint(context, checkpoint_name: str = "products_daily_check"):
"""Run a checkpoint and return results."""
result = context.run_checkpoint(checkpoint_name=checkpoint_name)
if result.success:
print("All expectations passed!")
else:
print("Validation FAILED:")
for validation_result in result.run_results.values():
results = validation_result.get("validation_result", {})
for r in results.get("results", []):
if not r.get("success", True):
expectation = r.get("expectation_config", {})
print(f" FAILED: {expectation.get('expectation_type')}")
print(f" Column: {expectation.get('kwargs', {}).get('column', 'N/A')}")
observed = r.get("result", {}).get("observed_value")
print(f" Observed: {observed}")
return resultCustom Expectations
# custom_expectations.py — Build domain-specific validation rules
from great_expectations.expectations import ExpectationConfiguration
from great_expectations.core import ExpectationValidationResult
from great_expectations.execution_engine import PandasExecutionEngine
from great_expectations.expectations.expectation import ColumnMapExpectation
from great_expectations.expectations.metrics import ColumnMapMetricProvider, column_condition_partial
import re
class ExpectColumnValuesToBeValidPhoneNumber(ColumnMapExpectation):
"""Expect column values to be valid US phone numbers."""
expectation_type = "expect_column_values_to_be_valid_phone_number"
# Define the metric this expectation uses
map_metric = "column_values.valid_phone_number"
# Default parameters
success_keys = ("mostly",)
default_kwarg_values = {
"mostly": 1.0,
}
# Documentation
library_metadata = {
"tags": ["phone", "validation", "custom"],
"contributors": ["data-team"],
}
class ColumnValuesValidPhoneNumber(ColumnMapMetricProvider):
condition_metric_name = "column_values.valid_phone_number"
@column_condition_partial(engine=PandasExecutionEngine)
def _pandas(cls, column, **kwargs):
phone_pattern = re.compile(
r"^(\+1[-.]?)?\(?(\d{3})\)?[-.\s]?(\d{3})[-.\s]?(\d{4})$"
)
return column.apply(
lambda x: bool(phone_pattern.match(str(x))) if x else True
)
class ExpectColumnValuesToBeValidJSON(ColumnMapExpectation):
"""Expect column values to be valid JSON strings."""
expectation_type = "expect_column_values_to_be_valid_json"
map_metric = "column_values.valid_json"
success_keys = ("mostly",)
default_kwarg_values = {"mostly": 1.0}
class ColumnValuesValidJSON(ColumnMapMetricProvider):
condition_metric_name = "column_values.valid_json"
@column_condition_partial(engine=PandasExecutionEngine)
def _pandas(cls, column, **kwargs):
import json
def is_valid_json(val):
if not isinstance(val, str):
return False
try:
json.loads(val)
return True
except (json.JSONDecodeError, TypeError):
return False
return column.apply(is_valid_json)
# Usage in an expectation suite
# validator.expect_column_values_to_be_valid_phone_number("phone", mostly=0.95)
# validator.expect_column_values_to_be_valid_json("metadata")Automated Profiling
# profiling.py — Auto-generate expectations from data
import great_expectations as gx
import pandas as pd
def auto_profile(
context,
datasource_name: str,
data_asset_name: str,
suite_name: str = "auto_profiled",
) -> dict:
"""
Automatically generate expectations by profiling data.
Good starting point, but always review and refine.
"""
profiler = context.add_or_update_profiler(
name="auto_profiler",
config_version=1.0,
rules={
"column_ranges": {
"domain_builder": {
"class_name": "ColumnDomainBuilder",
"include_semantic_types": ["numeric"],
},
"parameter_builders": [
{
"class_name": "NumericMetricRangeMultiBatchParameterBuilder",
"metric_name": "column.min",
"false_positive_rate": 0.01,
},
{
"class_name": "NumericMetricRangeMultiBatchParameterBuilder",
"metric_name": "column.max",
"false_positive_rate": 0.01,
},
],
"expectation_configuration_builders": [
{
"class_name": "DefaultExpectationConfigurationBuilder",
"expectation_type": "expect_column_values_to_be_between",
"column": "$domain.domain_kwargs.column",
"min_value": "$parameter.column.min.value[-1]",
"max_value": "$parameter.column.max.value[-1]",
}
],
},
"not_null": {
"domain_builder": {
"class_name": "ColumnDomainBuilder",
},
"expectation_configuration_builders": [
{
"class_name": "DefaultExpectationConfigurationBuilder",
"expectation_type": "expect_column_values_to_not_be_null",
"column": "$domain.domain_kwargs.column",
"mostly": 0.95,
}
],
},
},
)
# Run profiler
suite = profiler.run(
expectation_suite_name=suite_name,
batch_request={
"datasource_name": datasource_name,
"data_asset_name": data_asset_name,
},
)
return suite
# Quick profiling with pandas
def quick_profile(df: pd.DataFrame, suite_name: str = "quick_profile") -> list[dict]:
"""Generate expectations from DataFrame statistics (no GX required)."""
expectations = []
# Row count
expectations.append({
"type": "expect_table_row_count_to_be_between",
"kwargs": {
"min_value": int(len(df) * 0.5),
"max_value": int(len(df) * 2.0),
},
})
for col in df.columns:
# Not null
null_rate = df[col].isnull().mean()
if null_rate < 0.01:
expectations.append({
"type": "expect_column_values_to_not_be_null",
"kwargs": {"column": col},
})
elif null_rate < 0.5:
expectations.append({
"type": "expect_column_values_to_not_be_null",
"kwargs": {"column": col, "mostly": round(1 - null_rate - 0.05, 2)},
})
# Numeric ranges
if pd.api.types.is_numeric_dtype(df[col]):
q01 = df[col].quantile(0.01)
q99 = df[col].quantile(0.99)
expectations.append({
"type": "expect_column_values_to_be_between",
"kwargs": {
"column": col,
"min_value": float(q01),
"max_value": float(q99),
"mostly": 0.98,
},
})
# Uniqueness
if df[col].nunique() == len(df):
expectations.append({
"type": "expect_column_values_to_be_unique",
"kwargs": {"column": col},
})
# Low cardinality -> value set
if df[col].dtype == "object" and df[col].nunique() < 20:
expectations.append({
"type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": col,
"value_set": df[col].dropna().unique().tolist(),
"mostly": 0.95,
},
})
return expectationsCI/CD Integration
# ci_validation.py — Run GX validations in CI/CD pipelines
import sys
import json
import great_expectations as gx
from pathlib import Path
def validate_in_ci(
project_dir: str,
checkpoint_name: str,
fail_on_error: bool = True,
) -> bool:
"""
Run validation in CI/CD. Returns True if all checks pass.
Exits with code 1 on failure for CI pipeline integration.
"""
context = gx.get_context(project_root_dir=project_dir)
result = context.run_checkpoint(checkpoint_name=checkpoint_name)
# Extract summary
summary = {
"success": result.success,
"statistics": {},
}
for run_id, run_result in result.run_results.items():
vr = run_result.get("validation_result", {})
stats = vr.get("statistics", {})
summary["statistics"][str(run_id)] = {
"evaluated": stats.get("evaluated_expectations", 0),
"successful": stats.get("successful_expectations", 0),
"unsuccessful": stats.get("unsuccessful_expectations", 0),
"success_percent": stats.get("success_percent", 0),
}
# Print summary
print(json.dumps(summary, indent=2))
if not result.success and fail_on_error:
print("VALIDATION FAILED - blocking pipeline")
sys.exit(1)
return result.success
# GitHub Actions workflow step:
# - name: Validate data quality
# run: python ci_validation.py --project-dir . --checkpoint products_daily_check
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--project-dir", default=".")
parser.add_argument("--checkpoint", required=True)
args = parser.parse_args()
validate_in_ci(args.project_dir, args.checkpoint)Quick Reference
| Expectation Category | Example |
|---|---|
| Column existence | expect_table_columns_to_match_set() |
| Not null | expect_column_values_to_not_be_null(mostly=0.95) |
| Uniqueness | expect_column_values_to_be_unique() |
| Value range | expect_column_values_to_be_between(min, max) |
| Value set | expect_column_values_to_be_in_set(["a", "b"]) |
| Regex match | expect_column_values_to_match_regex(pattern) |
| Type check | expect_column_values_to_be_of_type("int64") |
| Row count | expect_table_row_count_to_be_between(100, 10000) |
| Mean range | expect_column_mean_to_be_between(10, 50) |
| Distribution | expect_column_kl_divergence_to_be_less_than() |
| Component | Purpose |
|---|---|
| Expectation Suite | Group of related validation rules |
| Checkpoint | Executable validation configuration |
| Data Docs | Auto-generated HTML reports |
| Profiler | Auto-generate expectations from data |
| Store | Persist validation results |
| Action | What to do after validation (alert, report) |
Key Takeaway
- Great Expectations validates data against declarative rules (expectations) and generates rich HTML reports showing exactly what passed and what failed.
- Checkpoints bundle data + expectations + actions into a single executable validation step that integrates into CI/CD and orchestration.
- Auto-profiling generates a starting set of expectations from existing data, which you then refine with domain knowledge.
Exercise
Create a Validation Suite for an Orders Table
Define a Great Expectations suite that validates:
- The table has between 100 and 1,000,000 rows.
order_idis never null and is unique.total_amountis between 0 and 100,000.statusis one of: "pending", "completed", "cancelled", "refunded".created_atis a valid datetime that is not in the future.- The overall null rate across all columns is below 5%.
Then run the suite against a sample DataFrame and check the validation result.
Solution Sketch
import great_expectations as gx
import pandas as pd
from datetime import datetime
context = gx.get_context()
ds = context.sources.add_pandas("orders_source")
asset = ds.add_dataframe_asset("orders")
batch = asset.build_batch_request(dataframe=df)
validator = context.get_validator(batch_request=batch, expectation_suite_name="orders_suite")
validator.expect_table_row_count_to_be_between(100, 1_000_000)
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between("total_amount", 0, 100_000)
validator.expect_column_values_to_be_in_set("status", ["pending", "completed", "cancelled", "refunded"])
validator.expect_column_values_to_not_be_null("created_at")
result = validator.validate()
print(f"Success: {result.success}, Results: {len(result.results)}")Debugging Scenario
Your Great Expectations checkpoint passes in CI but fails in production with "column not found" errors, even though the same data source is used.
Diagnose and fix it.
Answer
Common causes:
- Schema evolution: the production database had a column renamed or removed after the expectations were written. The CI test uses a static fixture that still has the old schema. Fix: run schema discovery before validation and update expectations when schema changes are detected.
- Case sensitivity: the expectation references
Order_IDbut the production database returnsorder_id. Fix: normalize column names to lowercase in the pipeline before validation. - Different data source versions: CI connects to a test database with a different schema version than production. Fix: use the same connection configuration and run validation against a sample of production data in CI.
- Column ordering: some backends return columns in different orders, and expectations reference columns by position rather than name. Fix: always reference columns by name, never by position.
Common Misconceptions
- "Great Expectations replaces unit tests." GX validates data, not code. You still need unit tests for your transformation functions. GX and pytest serve different purposes.
- "Auto-profiling gives production-ready expectations." Auto-profiling creates expectations based on the current data snapshot. It may encode data bugs as "expected behavior" and set overly tight bounds. Always review and adjust auto-generated expectations.
- "Validation should block the pipeline on any failure." Some expectations are warnings (null rate slightly above normal), not blockers. Use
mostlyparameter (e.g.,mostly=0.95for 95% compliance) and severity levels. - "One expectation suite per table is sufficient." Different consumers need different validation levels. A dashboard may tolerate 5% nulls while an ML model needs zero nulls. Create consumer-specific suites.
Quiz
1. What is the difference between an Expectation and a Checkpoint in Great Expectations?
An Expectation is a single validation rule (e.g., "column X should not be null"). A Checkpoint bundles a data source, an expectation suite, and actions (report, alert) into a single executable validation step.
2. What does the mostly parameter do?
It allows partial compliance.
expect_column_values_to_not_be_null("email", mostly=0.95)passes if at least 95% of values are non-null, rather than requiring 100%.
3. What are Data Docs?
Auto-generated HTML reports that show validation results, expectation definitions, and data statistics. They serve as living documentation of data quality.
4. How does auto-profiling work?
GX analyzes a sample of data and generates expectations based on observed patterns (value ranges, null rates, unique counts, distributions). These serve as a starting point for manual refinement.
5. How do you integrate GX into a CI/CD pipeline?
Run a Checkpoint as a step in CI/CD (e.g.,
context.run_checkpoint("my_checkpoint")). If validation fails, the CI step fails and blocks deployment. Store Data Docs as build artifacts.
One-Liner Summary: Great Expectations is a data validation platform that defines expectations as declarative rules, runs them via checkpoints, and generates HTML reports showing exactly what passed and failed.