Skip to content
Unverified — AI-generated content. Help verify this page

Data Contracts

A data contract is a formal agreement between a data producer and its consumers about what the data will look like, when it will arrive, and what guarantees come with it. Without contracts, a team that changes a column name breaks every downstream dashboard, model, and report — and nobody finds out until the CEO asks why the numbers are wrong. Data contracts make these dependencies explicit, testable, and enforceable.


What Is a Data Contract?

A data contract specifies:

ComponentWhat It CoversExample
SchemaColumn names, types, constraintsprice FLOAT NOT NULL, price >= 0
SLAFreshness and availabilityData available by 6 AM UTC daily
QualityStatistical propertiesNull rate < 5%, row count > 1000
SemanticsWhat values meanstatus='active' means currently available for purchase
OwnershipWho maintains itPlatform team, contact: data-platform@company.com
VersioningHow changes are managedSemantic versioning, 30-day deprecation notice

Contract Definition Format

python
# contract_definition.py — Define data contracts in code
from dataclasses import dataclass, field
from datetime import time
from enum import Enum
from typing import Any
import json
import yaml
from pathlib import Path


class ColumnType(Enum):
    STRING = "string"
    INTEGER = "integer"
    FLOAT = "float"
    BOOLEAN = "boolean"
    DATETIME = "datetime"
    DATE = "date"
    JSON = "json"


class ChangeType(Enum):
    BACKWARD_COMPATIBLE = "backward_compatible"  # Can add nullable columns
    BREAKING = "breaking"  # Removed/renamed columns, type changes


@dataclass
class ColumnContract:
    """Contract for a single column."""
    name: str
    type: ColumnType
    description: str
    nullable: bool = True
    unique: bool = False
    primary_key: bool = False
    min_value: Any = None
    max_value: Any = None
    allowed_values: list[str] | None = None
    pattern: str | None = None  # Regex
    deprecated: bool = False
    deprecated_by: str | None = None  # Replacement column


@dataclass
class QualityContract:
    """Quality guarantees for the dataset."""
    min_row_count: int = 1
    max_row_count: int | None = None
    max_null_rate: dict[str, float] = field(default_factory=dict)
    max_duplicate_rate: float = 0.0
    freshness_hours: float = 24.0
    custom_checks: list[dict] = field(default_factory=list)


@dataclass
class SLAContract:
    """Service Level Agreement for data delivery."""
    availability_target: float = 0.99  # 99% uptime
    delivery_deadline_utc: str = "06:00"
    max_latency_minutes: int = 60
    support_contact: str = ""
    escalation_policy: str = ""


@dataclass
class DataContract:
    """Complete data contract between producer and consumers."""
    name: str
    version: str
    description: str
    owner: str
    domain: str

    schema: list[ColumnContract]
    quality: QualityContract
    sla: SLAContract

    consumers: list[str] = field(default_factory=list)
    tags: list[str] = field(default_factory=list)

    def to_yaml(self, path: str):
        """Export contract to YAML for version control."""
        data = {
            "contract": {
                "name": self.name,
                "version": self.version,
                "description": self.description,
                "owner": self.owner,
                "domain": self.domain,
                "consumers": self.consumers,
                "tags": self.tags,
            },
            "schema": [
                {
                    "name": col.name,
                    "type": col.type.value,
                    "description": col.description,
                    "nullable": col.nullable,
                    "unique": col.unique,
                    **({f"min_value": col.min_value} if col.min_value is not None else {}),
                    **({f"max_value": col.max_value} if col.max_value is not None else {}),
                    **({f"allowed_values": col.allowed_values} if col.allowed_values else {}),
                    **({f"pattern": col.pattern} if col.pattern else {}),
                }
                for col in self.schema
            ],
            "quality": {
                "min_row_count": self.quality.min_row_count,
                "max_null_rate": self.quality.max_null_rate,
                "freshness_hours": self.quality.freshness_hours,
            },
            "sla": {
                "availability_target": self.sla.availability_target,
                "delivery_deadline_utc": self.sla.delivery_deadline_utc,
                "max_latency_minutes": self.sla.max_latency_minutes,
                "support_contact": self.sla.support_contact,
            },
        }
        Path(path).write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))

    @classmethod
    def from_yaml(cls, path: str) -> "DataContract":
        """Load contract from YAML."""
        data = yaml.safe_load(Path(path).read_text())
        contract_info = data["contract"]
        schema_data = data.get("schema", [])
        quality_data = data.get("quality", {})
        sla_data = data.get("sla", {})

        return cls(
            name=contract_info["name"],
            version=contract_info["version"],
            description=contract_info["description"],
            owner=contract_info["owner"],
            domain=contract_info["domain"],
            consumers=contract_info.get("consumers", []),
            tags=contract_info.get("tags", []),
            schema=[
                ColumnContract(
                    name=col["name"],
                    type=ColumnType(col["type"]),
                    description=col.get("description", ""),
                    nullable=col.get("nullable", True),
                    unique=col.get("unique", False),
                    min_value=col.get("min_value"),
                    max_value=col.get("max_value"),
                    allowed_values=col.get("allowed_values"),
                    pattern=col.get("pattern"),
                )
                for col in schema_data
            ],
            quality=QualityContract(
                min_row_count=quality_data.get("min_row_count", 1),
                max_null_rate=quality_data.get("max_null_rate", {}),
                freshness_hours=quality_data.get("freshness_hours", 24),
            ),
            sla=SLAContract(
                availability_target=sla_data.get("availability_target", 0.99),
                delivery_deadline_utc=sla_data.get("delivery_deadline_utc", "06:00"),
                max_latency_minutes=sla_data.get("max_latency_minutes", 60),
                support_contact=sla_data.get("support_contact", ""),
            ),
        )


# Define a contract
orders_contract = DataContract(
    name="orders",
    version="2.1.0",
    description="Customer order data from the checkout service",
    owner="checkout-team",
    domain="commerce",
    consumers=["analytics", "ml-recommendations", "finance-dashboard"],
    tags=["critical", "pii"],
    schema=[
        ColumnContract("order_id", ColumnType.INTEGER, "Unique order identifier", nullable=False, unique=True, primary_key=True),
        ColumnContract("customer_id", ColumnType.INTEGER, "Reference to customer", nullable=False),
        ColumnContract("total", ColumnType.FLOAT, "Order total in USD", nullable=False, min_value=0),
        ColumnContract("status", ColumnType.STRING, "Order status", nullable=False, allowed_values=["pending", "confirmed", "shipped", "delivered", "cancelled"]),
        ColumnContract("created_at", ColumnType.DATETIME, "When the order was placed", nullable=False),
        ColumnContract("updated_at", ColumnType.DATETIME, "Last modification timestamp", nullable=False),
    ],
    quality=QualityContract(
        min_row_count=100,
        max_null_rate={"customer_id": 0.0, "total": 0.0, "status": 0.0},
        freshness_hours=4,
    ),
    sla=SLAContract(
        availability_target=0.999,
        delivery_deadline_utc="06:00",
        max_latency_minutes=30,
        support_contact="checkout-team@company.com",
    ),
)

orders_contract.to_yaml("contracts/orders.yaml")

Contract Validation

python
# contract_validator.py — Validate data against a contract
import pandas as pd
import numpy as np
from datetime import datetime
import re
import logging
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class ContractViolation:
    contract_name: str
    check_type: str  # "schema", "quality", "sla"
    column: str | None
    message: str
    severity: str  # "error", "warning"


class ContractValidator:
    """Validate DataFrames against data contracts."""

    def __init__(self, contract):
        self.contract = contract

    def validate(self, df: pd.DataFrame) -> list[ContractViolation]:
        """Run all contract validations."""
        violations = []
        violations.extend(self._validate_schema(df))
        violations.extend(self._validate_quality(df))
        return violations

    def _validate_schema(self, df: pd.DataFrame) -> list[ContractViolation]:
        violations = []

        # Check required columns
        expected_cols = {col.name for col in self.contract.schema if not col.deprecated}
        actual_cols = set(df.columns)
        missing = expected_cols - actual_cols

        for col_name in missing:
            violations.append(ContractViolation(
                contract_name=self.contract.name,
                check_type="schema",
                column=col_name,
                message=f"Required column '{col_name}' is missing",
                severity="error",
            ))

        # Validate each column
        for col_spec in self.contract.schema:
            if col_spec.name not in df.columns:
                continue

            series = df[col_spec.name]

            # Nullable
            if not col_spec.nullable and series.isnull().any():
                violations.append(ContractViolation(
                    contract_name=self.contract.name,
                    check_type="schema",
                    column=col_spec.name,
                    message=f"Column has {series.isnull().sum()} nulls (not nullable)",
                    severity="error",
                ))

            # Unique
            if col_spec.unique and series.duplicated().any():
                violations.append(ContractViolation(
                    contract_name=self.contract.name,
                    check_type="schema",
                    column=col_spec.name,
                    message=f"Column has {series.duplicated().sum()} duplicate values (must be unique)",
                    severity="error",
                ))

            # Min/max value
            if col_spec.min_value is not None:
                below = (series.dropna() < col_spec.min_value).sum()
                if below > 0:
                    violations.append(ContractViolation(
                        contract_name=self.contract.name,
                        check_type="schema",
                        column=col_spec.name,
                        message=f"{below} values below minimum {col_spec.min_value}",
                        severity="error",
                    ))

            # Allowed values
            if col_spec.allowed_values:
                invalid = ~series.dropna().isin(col_spec.allowed_values)
                if invalid.any():
                    bad_values = series.dropna()[invalid].unique()[:5]
                    violations.append(ContractViolation(
                        contract_name=self.contract.name,
                        check_type="schema",
                        column=col_spec.name,
                        message=f"Invalid values: {list(bad_values)}",
                        severity="error",
                    ))

            # Pattern
            if col_spec.pattern:
                non_null = series.dropna().astype(str)
                non_match = ~non_null.str.match(col_spec.pattern)
                if non_match.any():
                    violations.append(ContractViolation(
                        contract_name=self.contract.name,
                        check_type="schema",
                        column=col_spec.name,
                        message=f"{non_match.sum()} values don't match pattern '{col_spec.pattern}'",
                        severity="error",
                    ))

        return violations

    def _validate_quality(self, df: pd.DataFrame) -> list[ContractViolation]:
        violations = []
        quality = self.contract.quality

        # Row count
        if len(df) < quality.min_row_count:
            violations.append(ContractViolation(
                contract_name=self.contract.name,
                check_type="quality",
                column=None,
                message=f"Row count {len(df)} below minimum {quality.min_row_count}",
                severity="error",
            ))

        if quality.max_row_count and len(df) > quality.max_row_count:
            violations.append(ContractViolation(
                contract_name=self.contract.name,
                check_type="quality",
                column=None,
                message=f"Row count {len(df)} above maximum {quality.max_row_count}",
                severity="error",
            ))

        # Null rates
        for col, max_rate in quality.max_null_rate.items():
            if col in df.columns:
                actual_rate = df[col].isnull().mean()
                if actual_rate > max_rate:
                    violations.append(ContractViolation(
                        contract_name=self.contract.name,
                        check_type="quality",
                        column=col,
                        message=f"Null rate {actual_rate:.1%} exceeds max {max_rate:.1%}",
                        severity="error",
                    ))

        return violations

    def validate_and_report(self, df: pd.DataFrame) -> dict:
        """Validate and return a structured report."""
        violations = self.validate(df)
        errors = [v for v in violations if v.severity == "error"]
        warnings = [v for v in violations if v.severity == "warning"]

        passed = len(errors) == 0

        report = {
            "contract": self.contract.name,
            "version": self.contract.version,
            "passed": passed,
            "errors": len(errors),
            "warnings": len(warnings),
            "violations": [
                {
                    "type": v.check_type,
                    "column": v.column,
                    "message": v.message,
                    "severity": v.severity,
                }
                for v in violations
            ],
            "validated_at": datetime.utcnow().isoformat(),
        }

        if not passed:
            logger.error(
                f"Contract '{self.contract.name}' VIOLATED: "
                f"{len(errors)} errors, {len(warnings)} warnings"
            )
        else:
            logger.info(f"Contract '{self.contract.name}' PASSED")

        return report

Breaking Change Detection

python
# breaking_changes.py — Detect breaking changes between contract versions
import logging
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class ContractChange:
    change_type: str  # "breaking" or "compatible"
    description: str
    affected_columns: list[str]


def detect_breaking_changes(
    old_contract,
    new_contract,
) -> list[ContractChange]:
    """Compare two contract versions and detect breaking changes."""
    changes = []

    old_cols = {col.name: col for col in old_contract.schema}
    new_cols = {col.name: col for col in new_contract.schema}

    # Removed columns (BREAKING)
    for col_name in set(old_cols) - set(new_cols):
        if not old_cols[col_name].deprecated:
            changes.append(ContractChange(
                change_type="breaking",
                description=f"Column '{col_name}' removed without deprecation",
                affected_columns=[col_name],
            ))

    # Type changes (BREAKING)
    for col_name in set(old_cols) & set(new_cols):
        old_col = old_cols[col_name]
        new_col = new_cols[col_name]

        if old_col.type != new_col.type:
            changes.append(ContractChange(
                change_type="breaking",
                description=f"Column '{col_name}' type changed: {old_col.type.value} -> {new_col.type.value}",
                affected_columns=[col_name],
            ))

        # Nullable to not-nullable (BREAKING)
        if old_col.nullable and not new_col.nullable:
            changes.append(ContractChange(
                change_type="breaking",
                description=f"Column '{col_name}' changed from nullable to not-nullable",
                affected_columns=[col_name],
            ))

        # Allowed values reduced (BREAKING)
        if old_col.allowed_values and new_col.allowed_values:
            removed_values = set(old_col.allowed_values) - set(new_col.allowed_values)
            if removed_values:
                changes.append(ContractChange(
                    change_type="breaking",
                    description=f"Column '{col_name}' removed allowed values: {removed_values}",
                    affected_columns=[col_name],
                ))

    # Added columns (COMPATIBLE if nullable)
    for col_name in set(new_cols) - set(old_cols):
        new_col = new_cols[col_name]
        if new_col.nullable:
            changes.append(ContractChange(
                change_type="compatible",
                description=f"New nullable column '{col_name}' added",
                affected_columns=[col_name],
            ))
        else:
            changes.append(ContractChange(
                change_type="breaking",
                description=f"New NOT NULL column '{col_name}' added",
                affected_columns=[col_name],
            ))

    # SLA changes (BREAKING if degraded)
    if new_contract.sla.max_latency_minutes > old_contract.sla.max_latency_minutes:
        changes.append(ContractChange(
            change_type="breaking",
            description=(
                f"SLA latency degraded: "
                f"{old_contract.sla.max_latency_minutes}min -> "
                f"{new_contract.sla.max_latency_minutes}min"
            ),
            affected_columns=[],
        ))

    # Report
    breaking = [c for c in changes if c.change_type == "breaking"]
    compatible = [c for c in changes if c.change_type == "compatible"]

    if breaking:
        logger.error(
            f"BREAKING CHANGES detected between "
            f"v{old_contract.version} and v{new_contract.version}:\n"
            + "\n".join(f"  - {c.description}" for c in breaking)
        )
    else:
        logger.info("No breaking changes detected")

    return changes

Contract Testing in CI/CD

python
# test_contracts.py — Test data contracts in CI
import pytest
from contract_definition import DataContract
from contract_validator import ContractValidator
from breaking_changes import detect_breaking_changes
import pandas as pd


class TestOrdersContract:
    """CI tests for the orders data contract."""

    @pytest.fixture
    def contract(self):
        return DataContract.from_yaml("contracts/orders.yaml")

    @pytest.fixture
    def sample_data(self):
        return pd.DataFrame({
            "order_id": [1, 2, 3],
            "customer_id": [101, 102, 103],
            "total": [29.99, 49.99, 15.00],
            "status": ["confirmed", "shipped", "pending"],
            "created_at": pd.to_datetime(["2024-01-01"] * 3),
            "updated_at": pd.to_datetime(["2024-01-02"] * 3),
        })

    def test_sample_data_passes_contract(self, contract, sample_data):
        validator = ContractValidator(contract)
        report = validator.validate_and_report(sample_data)
        assert report["passed"], f"Violations: {report['violations']}"

    def test_no_breaking_changes_from_last_version(self, contract):
        """Ensure current contract is backward compatible."""
        try:
            previous = DataContract.from_yaml("contracts/orders.v2.0.yaml")
        except FileNotFoundError:
            pytest.skip("No previous version to compare")

        changes = detect_breaking_changes(previous, contract)
        breaking = [c for c in changes if c.change_type == "breaking"]
        assert len(breaking) == 0, (
            f"Breaking changes detected:\n"
            + "\n".join(f"  - {c.description}" for c in breaking)
        )

    def test_contract_has_owner(self, contract):
        assert contract.owner, "Contract must have an owner"

    def test_contract_has_sla(self, contract):
        assert contract.sla.support_contact, "SLA must have support contact"
        assert contract.sla.max_latency_minutes > 0, "SLA latency must be positive"

Quick Reference

Contract ComponentWhat It DefinesWho Maintains
SchemaColumn names, types, constraintsProducer
Quality RulesNull rates, row counts, distributionsProducer + Consumer
SLADelivery time, availability, latencyProducer
SemanticsBusiness meaning of valuesProducer
VersioningChange management, deprecationProducer
Consumer ListWho depends on this dataBoth
Change TypeExampleImpact
Backward compatibleAdd nullable columnNo consumer changes needed
BreakingRemove columnAll consumers must update
BreakingChange column typeAll consumers must update
BreakingMake nullable column NOT NULLConsumers may send nulls
CompatibleAdd new allowed valueConsumers may ignore it
BreakingRemove allowed valueConsumers using it break
ToolTypeStrength
Custom YAML contractsCodeFull flexibility, version control
PanderaLibraryPython-native, type annotations
Great ExpectationsPlatformRich reporting, auto-profiling
SodaPlatformSQL-first, cloud-native
dbt contractsBuilt-inIntegrated with dbt models
Protobuf/AvroSchemaCross-language, schema registry

Key Takeaway

  • A data contract is a formal agreement between producer and consumer about schema, quality, SLA, and semantics -- it makes implicit dependencies explicit and testable.
  • Breaking changes (column removal, type change) require consumer coordination and deprecation periods; backward-compatible changes (adding nullable columns) do not.
  • Contract testing in CI/CD catches breaking changes before they reach production, preventing the 2 AM dashboard failure.
Exercise

Define and Test a Data Contract

Create a data contract for an orders table that specifies:

  1. Schema: 5 columns with types and nullability constraints.
  2. SLA: data must be available by 6 AM UTC daily.
  3. Quality: null rate below 5%, row count between 1K and 1M.
  4. Semantic: status values and their business meaning.
  5. Write a contract test that validates a DataFrame against this contract and reports violations.

Solution Sketch

python
from dataclasses import dataclass
from datetime import time
import pandas as pd

@dataclass
class ColumnContract:
    name: str
    dtype: str
    nullable: bool = True

@dataclass
class DataContract:
    name: str
    columns: list[ColumnContract]
    sla_time_utc: time
    min_rows: int
    max_rows: int
    max_null_rate: float

    def validate(self, df: pd.DataFrame) -> list[str]:
        errors = []
        # Schema
        for col in self.columns:
            if col.name not in df.columns:
                errors.append(f"Missing column: {col.name}")
            elif not col.nullable and df[col.name].isnull().any():
                errors.append(f"Null values in non-nullable column: {col.name}")
        # Volume
        if not (self.min_rows <= len(df) <= self.max_rows):
            errors.append(f"Row count {len(df)} outside [{self.min_rows}, {self.max_rows}]")
        # Quality
        null_rate = df.isnull().mean().max()
        if null_rate > self.max_null_rate:
            errors.append(f"Null rate {null_rate:.1%} exceeds {self.max_null_rate:.1%}")
        return errors

contract = DataContract(
    name="orders",
    columns=[
        ColumnContract("order_id", "int64", nullable=False),
        ColumnContract("total", "float64", nullable=False),
        ColumnContract("status", "string", nullable=False),
    ],
    sla_time_utc=time(6, 0),
    min_rows=1000, max_rows=1_000_000,
    max_null_rate=0.05,
)

violations = contract.validate(df)
assert not violations, f"Contract violations: {violations}"
Debugging Scenario

A downstream ML team's model accuracy drops by 15%. Investigation reveals that an upstream team renamed a column from user_age to customer_age two weeks ago. No one was notified.

Diagnose and fix it.

Answer

This is the exact problem data contracts solve. Without a contract, the upstream team had no way to know who depended on user_age and no obligation to notify anyone before changing it.

Fix:

  1. Define a data contract for the shared table that lists all columns, their types, and their semantic meaning.
  2. Add breaking change detection to the producer's CI/CD pipeline: compare the current schema against the contract and block deployment if columns are removed or renamed.
  3. Implement a consumer registry: every downstream consumer registers their dependency. Before any breaking change, the producer must notify all registered consumers and agree on a migration timeline.
  4. Add a deprecation period: new column customer_age is added alongside user_age. The old column is marked deprecated for 30 days, giving consumers time to migrate. After 30 days, the old column is removed.
  5. Contract tests: both producer and consumer CI pipelines validate against the contract, catching mismatches before deployment.

Common Misconceptions

  • "Data contracts are just documentation." Documentation describes intent; contracts are enforceable through automated testing. A contract test that fails blocks deployment.
  • "Only external data needs contracts." Internal data between teams is the most common source of breaking changes, precisely because it feels "safe" and changes happen without coordination.
  • "Schema is enough." Schema defines structure, but a contract also covers quality (null rates, distributions), SLA (delivery time), and semantics (what values mean). Schema alone does not prevent silent data quality degradation.
  • "Adding a column is always backward compatible." Adding a non-nullable column is a breaking change because existing consumers may not provide values for it. Only nullable columns with defaults are truly backward compatible.
Quiz

1. What are the main components of a data contract?

Schema (columns, types, constraints), SLA (delivery time, availability), Quality (null rates, row counts, distributions), Semantics (business meaning of values), and Versioning (change management, deprecation policy).

2. What is the difference between a backward-compatible and a breaking change?

Backward-compatible changes (adding a nullable column, adding a new allowed value) do not require consumer updates. Breaking changes (removing a column, changing a type, making a nullable column required) force all consumers to update.

3. How do contract tests integrate into CI/CD?

Both producer and consumer CI pipelines validate against the shared contract. Producer CI blocks deployment if a change violates the contract. Consumer CI blocks if the code references columns that no longer exist in the contract.

4. What is a deprecation period in data contracts?

A time window (e.g., 30 days) during which a deprecated field is still available but marked for removal. It gives consumers time to migrate before the breaking change takes effect.

5. Why is semantic documentation important in data contracts?

Two teams may interpret the same column differently. Documenting that status='active' means "currently available for purchase" (not "recently logged in") prevents semantic misalignment that causes business logic errors.

One-Liner Summary: Data contracts turn implicit assumptions between teams into explicit, testable, enforceable agreements about schema, quality, SLA, and semantics -- preventing the 2 AM surprise when someone renames a column.

"What I cannot create, I do not understand." — Richard Feynman