Testing Data Pipelines
Why Testing Data Pipelines Matters
Software engineers have been writing unit tests for decades. Data engineers often skip testing entirely, relying on eyeballing query results or checking row counts. This gap causes:
- Silent data corruption — Wrong values propagate undetected for weeks
- Regression on change — A "simple" SQL fix breaks three downstream tables
- Fear of refactoring — Nobody touches the pipeline because it might break
- Slow debugging — Without tests, investigating issues means manually re-running queries
Data pipeline testing is fundamentally different from application testing because:
- Input data is unpredictable (from external sources)
- Correctness depends on business logic that changes frequently
- "Expected output" is often hard to define precisely
- State accumulates over time (incremental pipelines)
Historical Context
- 2000s: Manual QA — run the pipeline, spot-check results
- 2010s: SQL-based assertions (custom scripts)
- 2018: dbt tests — declarative data quality testing integrated into transformation
- 2020: data-diff tools emerge (Datafold) — compare datasets across environments
- 2022: Contract testing for data (Soda, Great Expectations) becomes standard
- 2024-2025: CI/CD for data becomes mainstream with automated schema + data testing in PR pipelines
First Principles
The Data Testing Pyramid
| Level | Speed | Coverage | Confidence | Cost |
|---|---|---|---|---|
| Static analysis | Milliseconds | Syntax, style | Low | Lowest |
| Schema tests | Seconds | Structure | Medium-Low | Low |
| Unit tests | Seconds | Transform logic | Medium | Medium |
| Integration tests | Minutes | Multi-step flow | Medium-High | Medium-High |
| End-to-end tests | Minutes-Hours | Full pipeline | Highest | Highest |
What to Test
Unit Testing Transforms
Testing SQL Transforms
interface SQLTestCase {
name: string;
description: string;
inputTables: Record<string, Record<string, unknown>[]>;
expectedOutput: Record<string, unknown>[];
sqlTransform: string;
}
class SQLTransformTester {
constructor(private readonly db: TestDatabase) {}
async runTest(testCase: SQLTestCase): Promise<TestResult> {
// 1. Create temporary tables with test input data
for (const [tableName, rows] of Object.entries(testCase.inputTables)) {
await this.db.createTempTable(tableName, rows);
}
// 2. Execute the SQL transform
const actualOutput = await this.db.query(testCase.sqlTransform);
// 3. Compare with expected output
const comparison = this.compareResults(
(actualOutput as any).rows,
testCase.expectedOutput,
);
// 4. Clean up
for (const tableName of Object.keys(testCase.inputTables)) {
await this.db.dropTempTable(tableName);
}
return {
testName: testCase.name,
passed: comparison.passed,
details: comparison.details,
};
}
private compareResults(
actual: Record<string, unknown>[],
expected: Record<string, unknown>[],
): { passed: boolean; details: string } {
if (actual.length !== expected.length) {
return {
passed: false,
details: `Row count mismatch: got ${actual.length}, expected ${expected.length}`,
};
}
// Sort both by all columns for deterministic comparison
const sortKey = Object.keys(expected[0] ?? {}).join(',');
const sortedActual = [...actual].sort((a, b) =>
JSON.stringify(a) < JSON.stringify(b) ? -1 : 1,
);
const sortedExpected = [...expected].sort((a, b) =>
JSON.stringify(a) < JSON.stringify(b) ? -1 : 1,
);
for (let i = 0; i < sortedExpected.length; i++) {
for (const key of Object.keys(sortedExpected[i])) {
const actualVal = sortedActual[i][key];
const expectedVal = sortedExpected[i][key];
if (!this.valuesEqual(actualVal, expectedVal)) {
return {
passed: false,
details: `Row ${i}, column "${key}": got ${JSON.stringify(actualVal)}, expected ${JSON.stringify(expectedVal)}`,
};
}
}
}
return { passed: true, details: 'All rows match' };
}
private valuesEqual(a: unknown, b: unknown): boolean {
// Handle numeric comparison with tolerance
if (typeof a === 'number' && typeof b === 'number') {
return Math.abs(a - b) < 0.0001;
}
return JSON.stringify(a) === JSON.stringify(b);
}
}
interface TestDatabase {
createTempTable(name: string, rows: Record<string, unknown>[]): Promise<void>;
dropTempTable(name: string): Promise<void>;
query(sql: string): Promise<unknown>;
}
interface TestResult {
testName: string;
passed: boolean;
details: string;
}Example: Testing an Order Summary Transform
const orderSummaryTest: SQLTestCase = {
name: 'order_summary_aggregation',
description: 'Orders should be correctly aggregated by customer and date',
inputTables: {
stg_orders: [
{ order_id: 1, customer_id: 'C1', order_date: '2026-03-18', amount: 100 },
{ order_id: 2, customer_id: 'C1', order_date: '2026-03-18', amount: 50 },
{ order_id: 3, customer_id: 'C2', order_date: '2026-03-18', amount: 200 },
{ order_id: 4, customer_id: 'C1', order_date: '2026-03-17', amount: 75 },
],
},
expectedOutput: [
{ customer_id: 'C1', order_date: '2026-03-17', total_orders: 1, total_amount: 75 },
{ customer_id: 'C1', order_date: '2026-03-18', total_orders: 2, total_amount: 150 },
{ customer_id: 'C2', order_date: '2026-03-18', total_orders: 1, total_amount: 200 },
],
sqlTransform: `
SELECT
customer_id,
order_date,
COUNT(*) as total_orders,
SUM(amount) as total_amount
FROM stg_orders
GROUP BY customer_id, order_date
ORDER BY customer_id, order_date
`,
};Testing Edge Cases
const edgeCaseTests: SQLTestCase[] = [
{
name: 'null_amounts_handled',
description: 'NULL amounts should be treated as 0 in SUM',
inputTables: {
stg_orders: [
{ order_id: 1, customer_id: 'C1', order_date: '2026-03-18', amount: 100 },
{ order_id: 2, customer_id: 'C1', order_date: '2026-03-18', amount: null },
],
},
expectedOutput: [
{ customer_id: 'C1', order_date: '2026-03-18', total_orders: 2, total_amount: 100 },
],
sqlTransform: `
SELECT
customer_id,
order_date,
COUNT(*) as total_orders,
COALESCE(SUM(amount), 0) as total_amount
FROM stg_orders
GROUP BY customer_id, order_date
`,
},
{
name: 'empty_input',
description: 'Empty input should produce empty output',
inputTables: {
stg_orders: [],
},
expectedOutput: [],
sqlTransform: `
SELECT
customer_id,
order_date,
COUNT(*) as total_orders,
SUM(amount) as total_amount
FROM stg_orders
GROUP BY customer_id, order_date
`,
},
{
name: 'duplicate_orders_handled',
description: 'Duplicate order_ids should be deduplicated before aggregation',
inputTables: {
stg_orders: [
{ order_id: 1, customer_id: 'C1', order_date: '2026-03-18', amount: 100 },
{ order_id: 1, customer_id: 'C1', order_date: '2026-03-18', amount: 100 }, // Duplicate
],
},
expectedOutput: [
{ customer_id: 'C1', order_date: '2026-03-18', total_orders: 1, total_amount: 100 },
],
sqlTransform: `
WITH deduplicated AS (
SELECT DISTINCT ON (order_id) *
FROM stg_orders
ORDER BY order_id
)
SELECT
customer_id,
order_date,
COUNT(*) as total_orders,
SUM(amount) as total_amount
FROM deduplicated
GROUP BY customer_id, order_date
`,
},
];Data Diff Testing
Concept
Data diff compares datasets across environments (dev vs. prod) or across time (before vs. after a change):
Implementation
interface DataDiffConfig {
table1: { connection: string; table: string; label: string };
table2: { connection: string; table: string; label: string };
primaryKey: string[];
compareColumns: string[];
sampleSize?: number; // For large tables
tolerance?: Record<string, number>; // Numeric tolerance per column
}
interface DataDiffResult {
summary: {
totalRows1: number;
totalRows2: number;
matchedRows: number;
onlyIn1: number;
onlyIn2: number;
differentValues: number;
};
columnDiffs: Record<
string,
{
matchRate: number;
sampleDifferences: Array<{
primaryKey: Record<string, unknown>;
value1: unknown;
value2: unknown;
}>;
}
>;
}
class DataDiffEngine {
async diff(config: DataDiffConfig): Promise<DataDiffResult> {
// Step 1: Get row counts
const count1 = await this.getRowCount(config.table1);
const count2 = await this.getRowCount(config.table2);
// Step 2: Find rows only in one table
const onlyIn1 = await this.findExclusive(
config.table1, config.table2, config.primaryKey,
);
const onlyIn2 = await this.findExclusive(
config.table2, config.table1, config.primaryKey,
);
// Step 3: Compare values for matching rows
const columnDiffs: Record<string, { matchRate: number; sampleDifferences: Array<{ primaryKey: Record<string, unknown>; value1: unknown; value2: unknown }> }> = {};
for (const column of config.compareColumns) {
const tolerance = config.tolerance?.[column] ?? 0;
const diff = await this.compareColumn(
config.table1,
config.table2,
config.primaryKey,
column,
tolerance,
);
columnDiffs[column] = diff;
}
const matchedRows = count1 - onlyIn1;
return {
summary: {
totalRows1: count1,
totalRows2: count2,
matchedRows,
onlyIn1,
onlyIn2,
differentValues: Object.values(columnDiffs).reduce(
(sum, d) => sum + d.sampleDifferences.length,
0,
),
},
columnDiffs,
};
}
private async getRowCount(
table: { connection: string; table: string },
): Promise<number> {
// Execute: SELECT COUNT(*) FROM table
return 0; // Placeholder
}
private async findExclusive(
table1: { connection: string; table: string },
table2: { connection: string; table: string },
primaryKey: string[],
): Promise<number> {
// Execute: SELECT COUNT(*) FROM table1 WHERE pk NOT IN (SELECT pk FROM table2)
return 0; // Placeholder
}
private async compareColumn(
_table1: { connection: string; table: string },
_table2: { connection: string; table: string },
_primaryKey: string[],
_column: string,
_tolerance: number,
): Promise<{ matchRate: number; sampleDifferences: Array<{ primaryKey: Record<string, unknown>; value1: unknown; value2: unknown }> }> {
return { matchRate: 1.0, sampleDifferences: [] };
}
}Data Diff in CI/CD
interface CIDataDiffCheck {
// Run data diff as part of PR checks
async runCheck(prNumber: number): Promise<CICheckResult>;
}
class PRDataDiffChecker implements CIDataDiffCheck {
constructor(
private readonly diffEngine: DataDiffEngine,
private readonly config: {
devConnection: string;
prodConnection: string;
models: string[];
maxAcceptableDiffPercent: number;
},
) {}
async runCheck(prNumber: number): Promise<CICheckResult> {
const results: ModelDiffResult[] = [];
for (const model of this.config.models) {
const diff = await this.diffEngine.diff({
table1: {
connection: this.config.devConnection,
table: `pr_${prNumber}_${model}`,
label: 'PR branch',
},
table2: {
connection: this.config.prodConnection,
table: model,
label: 'Production',
},
primaryKey: ['id'], // Simplified
compareColumns: ['*'],
});
const diffPercent =
diff.summary.totalRows1 > 0
? ((diff.summary.differentValues + diff.summary.onlyIn1 + diff.summary.onlyIn2) /
diff.summary.totalRows1) *
100
: 0;
results.push({
model,
diffPercent,
passed: diffPercent <= this.config.maxAcceptableDiffPercent,
summary: diff.summary,
});
}
const allPassed = results.every((r) => r.passed);
return {
passed: allPassed,
message: allPassed
? `All ${results.length} models within acceptable diff range`
: `${results.filter((r) => !r.passed).length} models exceed diff threshold`,
details: results,
};
}
}
interface CICheckResult {
passed: boolean;
message: string;
details: ModelDiffResult[];
}
interface ModelDiffResult {
model: string;
diffPercent: number;
passed: boolean;
summary: DataDiffResult['summary'];
}Contract Testing
Producer-Consumer Contract Test
interface DataContractTest {
contractName: string;
producer: string;
consumer: string;
schemaChecks: SchemaCheck[];
qualityChecks: QualityCheck[];
semanticChecks: SemanticCheck[];
}
interface SchemaCheck {
type: 'column_exists' | 'column_type' | 'column_not_nullable';
column: string;
expectedType?: string;
}
interface QualityCheck {
type: 'uniqueness' | 'completeness' | 'range' | 'freshness';
column?: string;
threshold?: number;
min?: number;
max?: number;
}
interface SemanticCheck {
description: string;
sql: string;
expectation: 'empty' | 'not_empty' | 'count_equals';
expectedValue?: number;
}
class DataContractTester {
constructor(private readonly db: TestDatabase) {}
async testContract(
contract: DataContractTest,
tableName: string,
): Promise<ContractTestResult> {
const results: IndividualTestResult[] = [];
// Schema checks
for (const check of contract.schemaChecks) {
const result = await this.runSchemaCheck(tableName, check);
results.push(result);
}
// Quality checks
for (const check of contract.qualityChecks) {
const result = await this.runQualityCheck(tableName, check);
results.push(result);
}
// Semantic checks
for (const check of contract.semanticChecks) {
const result = await this.runSemanticCheck(check);
results.push(result);
}
return {
contractName: contract.contractName,
allPassed: results.every((r) => r.passed),
results,
timestamp: new Date(),
};
}
private async runSchemaCheck(
tableName: string,
check: SchemaCheck,
): Promise<IndividualTestResult> {
// Query information_schema to verify column existence and type
const sql = `
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = '${tableName}' AND column_name = '${check.column}'
`;
const result = await this.db.query(sql);
const rows = (result as any).rows;
if (rows.length === 0) {
return {
name: `schema.${check.type}.${check.column}`,
passed: false,
message: `Column '${check.column}' does not exist`,
};
}
return {
name: `schema.${check.type}.${check.column}`,
passed: true,
message: `Column '${check.column}' exists with type ${rows[0].data_type}`,
};
}
private async runQualityCheck(
tableName: string,
check: QualityCheck,
): Promise<IndividualTestResult> {
if (check.type === 'completeness' && check.column) {
const sql = `
SELECT
COUNT(*) as total,
COUNT(${check.column}) as non_null
FROM ${tableName}
`;
const result = await this.db.query(sql);
const row = (result as any).rows[0];
const completeness = row.total > 0 ? row.non_null / row.total : 1;
return {
name: `quality.completeness.${check.column}`,
passed: completeness >= (check.threshold ?? 1.0),
message: `Completeness: ${(completeness * 100).toFixed(2)}%`,
};
}
return { name: `quality.${check.type}`, passed: true, message: 'Check passed' };
}
private async runSemanticCheck(
check: SemanticCheck,
): Promise<IndividualTestResult> {
const result = await this.db.query(check.sql);
const rows = (result as any).rows;
let passed = false;
switch (check.expectation) {
case 'empty':
passed = rows.length === 0;
break;
case 'not_empty':
passed = rows.length > 0;
break;
case 'count_equals':
passed = rows.length === check.expectedValue;
break;
}
return {
name: `semantic.${check.description}`,
passed,
message: `${check.description}: ${passed ? 'PASS' : 'FAIL'} (${rows.length} rows)`,
};
}
}
interface ContractTestResult {
contractName: string;
allPassed: boolean;
results: IndividualTestResult[];
timestamp: Date;
}
interface IndividualTestResult {
name: string;
passed: boolean;
message: string;
}CI/CD for Data Pipelines
Pipeline Testing Workflow
interface DataPipelineCIConfig {
// Stage 1: Static checks (seconds)
lint: {
tool: 'sqlfluff' | 'sqlfmt';
rules: string[];
failOnWarning: boolean;
};
// Stage 2: Unit tests (seconds)
unitTests: {
framework: 'pytest' | 'jest' | 'dbt-test';
testDirectory: string;
parallelism: number;
};
// Stage 3: Integration build (minutes)
integrationBuild: {
environment: 'dev-ephemeral' | 'staging';
models: string[]; // Which dbt models to build
sampleData: boolean; // Use sample data or full data
samplePercent: number;
};
// Stage 4: Data diff (minutes)
dataDiff: {
enabled: boolean;
models: string[];
maxDiffPercent: number;
blockOnDiff: boolean;
};
// Stage 5: Contract tests (seconds)
contractTests: {
contracts: string[]; // Paths to contract definitions
strictMode: boolean;
};
}
const ciConfig: DataPipelineCIConfig = {
lint: {
tool: 'sqlfluff',
rules: ['L001', 'L002', 'L003', 'L010', 'L014'],
failOnWarning: false,
},
unitTests: {
framework: 'dbt-test',
testDirectory: 'tests/',
parallelism: 4,
},
integrationBuild: {
environment: 'dev-ephemeral',
models: ['stg_orders', 'stg_customers', 'fact_order_summary'],
sampleData: true,
samplePercent: 10,
},
dataDiff: {
enabled: true,
models: ['fact_order_summary'],
maxDiffPercent: 5,
blockOnDiff: false, // Warn but don't block
},
contractTests: {
contracts: ['contracts/orders.yaml', 'contracts/customers.yaml'],
strictMode: true,
},
};Performance Testing
Transform Performance Benchmarks
interface PerformanceBenchmark {
name: string;
query: string;
dataSize: number; // rows
maxDurationMs: number; // performance budget
maxMemoryMB: number; // memory budget
}
class PerformanceTestRunner {
async runBenchmark(
benchmark: PerformanceBenchmark,
db: TestDatabase,
): Promise<BenchmarkResult> {
const startTime = Date.now();
const startMemory = process.memoryUsage().heapUsed;
await db.query(benchmark.query);
const durationMs = Date.now() - startTime;
const memoryUsedMB =
(process.memoryUsage().heapUsed - startMemory) / (1024 * 1024);
return {
name: benchmark.name,
durationMs,
memoryUsedMB,
passedDuration: durationMs <= benchmark.maxDurationMs,
passedMemory: memoryUsedMB <= benchmark.maxMemoryMB,
throughput: benchmark.dataSize / (durationMs / 1000), // rows/sec
};
}
}
interface BenchmarkResult {
name: string;
durationMs: number;
memoryUsedMB: number;
passedDuration: boolean;
passedMemory: boolean;
throughput: number;
}Edge Cases & Failure Modes
Test Data Management
Maintaining test fixtures is the hardest part of data pipeline testing:
interface TestFixture {
name: string;
tables: Record<string, Record<string, unknown>[]>;
description: string;
tags: string[];
}
class TestFixtureManager {
private fixtures: Map<string, TestFixture> = new Map();
register(fixture: TestFixture): void {
this.fixtures.set(fixture.name, fixture);
}
/**
* Generate test data programmatically instead of maintaining static fixtures.
* This approach scales better and catches more edge cases.
*/
generateFixture(config: {
rows: number;
schema: Record<string, 'string' | 'number' | 'date' | 'boolean'>;
nullRate: number;
duplicateRate: number;
}): Record<string, unknown>[] {
const rows: Record<string, unknown>[] = [];
for (let i = 0; i < config.rows; i++) {
const row: Record<string, unknown> = {};
for (const [col, type] of Object.entries(config.schema)) {
// Introduce nulls at specified rate
if (Math.random() < config.nullRate) {
row[col] = null;
continue;
}
switch (type) {
case 'string':
row[col] = `value_${i}_${col}`;
break;
case 'number':
row[col] = Math.round(Math.random() * 10000) / 100;
break;
case 'date':
row[col] = new Date(
Date.now() - Math.random() * 365 * 24 * 60 * 60 * 1000,
).toISOString().split('T')[0];
break;
case 'boolean':
row[col] = Math.random() > 0.5;
break;
}
}
rows.push(row);
// Introduce duplicates at specified rate
if (Math.random() < config.duplicateRate) {
rows.push({ ...row });
}
}
return rows;
}
}Flaky Data Tests
Data tests can be flaky due to:
- Timing issues (freshness checks on actively loading tables)
- Sampling (random sample may not represent the full dataset)
- External dependencies (source systems returning different data)
Mitigation:
class FlakyTestHandler {
private testHistory: Map<string, boolean[]> = new Map();
recordResult(testName: string, passed: boolean): void {
const history = this.testHistory.get(testName) ?? [];
history.push(passed);
if (history.length > 10) history.shift();
this.testHistory.set(testName, history);
}
isFlakyTest(testName: string): boolean {
const history = this.testHistory.get(testName) ?? [];
if (history.length < 5) return false;
const passRate = history.filter(Boolean).length / history.length;
// Flaky: passes between 20% and 80% of the time
return passRate > 0.2 && passRate < 0.8;
}
getReliability(testName: string): number {
const history = this.testHistory.get(testName) ?? [];
if (history.length === 0) return 1;
return history.filter(Boolean).length / history.length;
}
}Mathematical Foundations
Test Coverage for Data
Unlike code coverage, data test coverage considers both structural and value coverage:
Statistical Power of Data Tests
For anomaly detection tests:
With a z-score threshold of
Where
Real-World War Stories
War Story
The Test That Caught a $2M Error
A data team added a simple dbt test: assert total daily revenue is within 2 standard deviations of the 30-day average. One morning, the test failed — revenue was 10x the normal amount.
Investigation revealed: a pricing service bug had set all product prices to $999.99. Without the test, the incorrect revenue would have flowed to executive dashboards, triggering misguided business decisions (inventory ordering, marketing budget allocation).
Lesson: Simple statistical tests catch catastrophic errors. The test took 5 minutes to write and saved $2M in potential downstream consequences.
War Story
The Data Diff That Revealed a Silent Bug
A team was refactoring their revenue calculation from a complex CTE to a simpler window function. They ran data diff between the old and new versions and found: 99.7% match, but 0.3% of rows had different values.
The 0.3% were all orders with partial refunds. The old CTE had a subtle bug that double-counted partial refunds. The new version was correct.
Without data diff, they would have deployed and introduced a regression (making the correct calculation look like a bug since it didn't match historical numbers).
Lesson: Data diff doesn't just prevent regressions — it discovers existing bugs.
Decision Framework
Test Strategy by Pipeline Type
| Pipeline Type | Unit Tests | Integration | Data Diff | Contract | E2E |
|---|---|---|---|---|---|
| dbt models | Yes (dbt tests) | Yes | Yes | Yes | Weekly |
| Streaming | Yes (transform logic) | Yes | No | Yes | Continuous |
| Batch ETL | Yes | Yes | Yes | Yes | Daily |
| ML features | Yes | Yes | Yes | Critical | Before training |
| Reverse ETL | Yes | Yes | No | Yes | Per-sync |
Advanced Topics
Property-Based Testing for Data
Instead of specific test cases, define properties that must always hold:
interface DataProperty {
name: string;
description: string;
check: (data: Record<string, unknown>[]) => boolean;
}
const revenueProperties: DataProperty[] = [
{
name: 'revenue_non_negative',
description: 'Revenue should never be negative',
check: (data) => data.every((row) => (row.revenue as number) >= 0),
},
{
name: 'quantity_integer',
description: 'Quantity should always be a whole number',
check: (data) =>
data.every((row) => Number.isInteger(row.quantity as number)),
},
{
name: 'revenue_equals_quantity_times_price',
description: 'Revenue should equal quantity * unit_price',
check: (data) =>
data.every((row) => {
const expected =
(row.quantity as number) * (row.unit_price as number);
return Math.abs((row.revenue as number) - expected) < 0.01;
}),
},
{
name: 'dates_not_future',
description: 'No dates should be in the future',
check: (data) =>
data.every(
(row) => new Date(row.order_date as string) <= new Date(),
),
},
];Mutation Testing for Data
Deliberately introduce data mutations to verify that tests catch them:
class DataMutationTester {
/**
* Introduce controlled mutations and verify tests catch them.
* If a mutation is NOT caught, the test suite has a gap.
*/
async testMutationDetection(
originalData: Record<string, unknown>[],
testSuite: DataProperty[],
): Promise<MutationReport> {
const mutations = this.generateMutations(originalData);
const report: MutationReport = {
totalMutations: mutations.length,
caughtMutations: 0,
missedMutations: [],
};
for (const mutation of mutations) {
const mutatedData = this.applyMutation(originalData, mutation);
const caught = testSuite.some((test) => !test.check(mutatedData));
if (caught) {
report.caughtMutations++;
} else {
report.missedMutations.push(mutation);
}
}
return report;
}
private generateMutations(
_data: Record<string, unknown>[],
): DataMutation[] {
return [
{ type: 'negate_value', column: 'revenue', description: 'Make revenue negative' },
{ type: 'null_value', column: 'customer_id', description: 'Null out customer ID' },
{ type: 'duplicate_row', rowIndex: 0, description: 'Duplicate first row' },
{ type: 'future_date', column: 'order_date', description: 'Set date to future' },
{ type: 'wrong_type', column: 'quantity', description: 'Set quantity to string' },
];
}
private applyMutation(
data: Record<string, unknown>[],
mutation: DataMutation,
): Record<string, unknown>[] {
const mutated = data.map((row) => ({ ...row }));
switch (mutation.type) {
case 'negate_value':
if (mutated.length > 0 && mutation.column) {
mutated[0][mutation.column] = -(mutated[0][mutation.column] as number);
}
break;
case 'null_value':
if (mutated.length > 0 && mutation.column) {
mutated[0][mutation.column] = null;
}
break;
case 'duplicate_row':
if (mutated.length > 0) {
mutated.push({ ...mutated[0] });
}
break;
}
return mutated;
}
}
interface DataMutation {
type: string;
column?: string;
rowIndex?: number;
description: string;
}
interface MutationReport {
totalMutations: number;
caughtMutations: number;
missedMutations: DataMutation[];
}Cross-References
- Pipeline Patterns Overview — Testing within pipeline architecture
- Data Quality Checks — Quality validation frameworks
- Schema Evolution — Testing schema changes
- Orchestration — CI/CD integration with orchestrators
- Data Lineage — Impact analysis for test prioritization
Key Takeaway
- Test data pipelines at three levels: unit tests (transform functions in isolation), integration tests (pipeline stages with real/mock data), and data diff tests (compare output against a known-good baseline).
- dbt tests (unique, not_null, accepted_values, relationships) provide declarative data quality assertions that run as part of the pipeline.
- CI/CD for data pipelines should include schema validation, transform unit tests, and production data diff comparison before deploying changes.
Exercise
Build a Test Suite for a Revenue Pipeline
Your revenue pipeline has this flow:
raw_orders -> stg_orders (clean, dedup) -> fct_orders (business logic, currency conversion) -> rpt_daily_revenue (aggregation)Design a comprehensive test suite with:
- Unit tests for the currency conversion function
- Integration test for the stg_orders dedup logic
- dbt tests for the fct_orders model
- Data diff test for rpt_daily_revenue
- A regression test that catches a real-world bug (e.g., negative revenue from incorrect refund handling)
Solution
- Unit test (currency conversion):
def test_convert_to_usd():
assert convert_to_usd(100, 'EUR', '2026-03-17') == 108.50 # known rate
assert convert_to_usd(100, 'USD', '2026-03-17') == 100.00
assert convert_to_usd(0, 'GBP', '2026-03-17') == 0.00
with pytest.raises(ValueError):
convert_to_usd(100, 'INVALID', '2026-03-17')- Integration test (dedup):
def test_stg_orders_dedup():
input_df = create_test_df([
{'order_id': 1, 'amount': 100, 'updated_at': '2026-03-17 10:00'},
{'order_id': 1, 'amount': 150, 'updated_at': '2026-03-17 11:00'}, # dupe, newer
])
result = run_stg_orders(input_df)
assert result.count() == 1
assert result.first()['amount'] == 150 # keeps newest- dbt tests (fct_orders):
models:
- name: fct_orders
columns:
- name: order_id
tests: [unique, not_null]
- name: amount_usd
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0" # no negative revenue
- name: customer_id
tests:
- relationships:
to: ref('dim_customers')
field: customer_idData diff test: Compare current output against last production run. Flag if daily revenue changes by more than 10% for any date already computed.
Regression test (refund bug):
def test_refund_does_not_create_negative_revenue():
orders = [{'order_id': 1, 'amount': 100, 'type': 'sale'},
{'order_id': 2, 'amount': -50, 'type': 'refund'}]
result = run_fct_orders(orders)
# Refunds should be tracked separately, not subtracted from revenue
assert result.filter("type = 'sale'").first()['amount_usd'] == 100
assert result.filter("type = 'refund'").first()['amount_usd'] == -50Common Misconceptions
- "Data pipelines don't need testing." Untested pipelines silently produce wrong data for weeks. By the time someone notices, the damage is done and backfilling is expensive.
- "Row count checks are sufficient testing." A pipeline can produce the right number of rows with completely wrong values. Test data content, not just shape.
- "Testing in production (monitoring) replaces testing in development." Production monitoring catches issues AFTER they affect users. Pre-deployment tests catch issues BEFORE they reach production. You need both.
- "dbt tests replace unit tests." dbt tests validate data in the warehouse (integration-level). You still need unit tests for complex Python/Spark transform logic that runs outside dbt.
- "Testing with production data is always better." Production data may contain PII, require VPN access, and be too large for local testing. Use synthetic test data with known edge cases for unit tests, production data samples for integration tests.
In Production
- Airbnb runs data diff tests comparing every dbt model's output against the previous production version before deploying changes, with automatic rollback if metrics deviate beyond thresholds.
- Spotify has a dedicated "data testing" team that maintains shared test fixtures and reusable validation frameworks used across 200+ data pipeline teams.
- Netflix uses property-based testing for their Spark transforms, generating random input data to verify that invariants (non-negative values, referential integrity) hold across edge cases.
- Uber requires every new data pipeline to include idempotency tests (run twice, assert same output) and schema validation tests as part of their CI/CD approval process.
Quiz
1. What are the three levels of data pipeline testing?
A) Alpha, beta, production B) Unit tests (transform functions), integration tests (pipeline stages), data diff tests (output comparison) C) Manual, automated, monitoring D) Syntax, logic, performance
Answer
B) Unit tests validate individual transform functions in isolation. Integration tests validate pipeline stages with realistic data. Data diff tests compare pipeline output against a known-good baseline to detect regressions.
2. What is a "data diff" test?
A) A test that checks for differences between two database schemas B) A test that compares current pipeline output against a previous known-good output, flagging unexpected changes C) A test that measures performance differences D) A test that checks for duplicate data
Answer
B) Data diff tests run the pipeline with the same input (or current production input) and compare the output against the last known-good result. Unexpected differences (row count changes, metric deviations) indicate regressions.
3. What is the purpose of a dbt relationships test?
A) To test database foreign key constraints B) To verify that every value in a column exists in another model's column (referential integrity in the warehouse) C) To test joins between tables D) To validate one-to-many relationships
Answer
B) relationships tests check referential integrity at the data level: every customer_id in fct_orders should exist in dim_customers. This catches broken references from missing dimensions, failed loads, or join issues.
4. Why is testing with production data alone insufficient?
A) Production data is too large B) Production data may not contain edge cases (nulls, boundary values, rare states) that your pipeline should handle C) Production data changes too frequently D) Production data is always clean
Answer
B) Production data reflects normal operation and may not include edge cases like null required fields, maximum-length strings, negative amounts, or timezone boundary conditions. Synthetic test data with deliberate edge cases ensures these paths are tested.
5. What should a CI/CD pipeline for data include?
A) Only code linting and formatting checks B) Schema validation, transform unit tests, integration tests, data diff comparison, and lineage impact analysis C) Only production deployment D) Performance benchmarks
Answer
B) A comprehensive data CI/CD pipeline validates schema compatibility, runs unit tests for transform logic, executes integration tests with sample data, compares output against baselines (data diff), and checks lineage for downstream impact before deploying.
:::
One-Liner Summary: Test data pipelines like software -- unit test transforms, integration test stages, diff test outputs against baselines, and never deploy a pipeline change without automated validation.