Skip to content
Unverified — AI-generated content. Help verify this page

Kafka Connect

Kafka Connect is a framework for streaming data between Apache Kafka and external systems — databases, search indexes, file systems, cloud storage, and more. Instead of writing custom producer/consumer code for every integration, you configure connectors declaratively.

This matters because data integration is where most engineering teams waste enormous amounts of time. Every team builds its own ETL scripts, cron jobs, and synchronization hacks. Kafka Connect replaces all of that with a standardized, fault-tolerant, scalable framework.

Related: Kafka Internals | Kafka Streams | Exactly-Once Semantics


Architecture

Key Concepts

ConceptDescription
WorkerJVM process that runs connectors and tasks
ConnectorPlugin that defines how to connect to an external system
TaskUnit of parallelism within a connector (one task per partition/table)
Source ConnectorReads from external system, writes to Kafka
Sink ConnectorReads from Kafka, writes to external system
ConverterSerializes/deserializes records (JSON, Avro, Protobuf)
Transform (SMT)Modifies records in-flight (single message transforms)

Deployment Modes

ModeDescriptionUse Case
StandaloneSingle worker processDevelopment, simple pipelines
DistributedMulti-worker cluster with automatic task balancingProduction

Connector Management (REST API)

Kafka Connect exposes a REST API for managing connectors.

EndpointMethodDescription
/connectorsGETList all connectors
/connectorsPOSTCreate a new connector
/connectors/{name}GETGet connector details
/connectors/{name}/configGETGet connector config
/connectors/{name}/configPUTUpdate connector config
/connectors/{name}/statusGETGet connector status
/connectors/{name}/tasksGETList tasks
/connectors/{name}/tasks/{id}/statusGETTask status
/connectors/{name}/restartPOSTRestart connector
/connectors/{name}/tasks/{id}/restartPOSTRestart a specific task
/connectors/{name}/pausePUTPause connector
/connectors/{name}/resumePUTResume connector
/connectors/{name}DELETEDelete connector
/connector-pluginsGETList installed plugins
bash
# List connectors
curl localhost:8083/connectors | jq

# Create connector
curl -X POST localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @connector-config.json

# Check status
curl localhost:8083/connectors/my-connector/status | jq

# Restart failed task
curl -X POST localhost:8083/connectors/my-connector/tasks/0/restart

Source Connectors

Debezium CDC (Change Data Capture)

Debezium reads the database's transaction log (WAL for PostgreSQL, binlog for MySQL) and produces a Kafka record for every row-level change. This is the gold standard for database-to-Kafka integration.

PostgreSQL

json
{
  "name": "postgres-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${file:/secrets/pg-password.txt:password}",
    "database.dbname": "myapp",
    "topic.prefix": "myapp",
    "schema.include.list": "public",
    "table.include.list": "public.users,public.orders",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "debezium_pub",
    "heartbeat.interval.ms": "10000",
    "snapshot.mode": "initial",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

TIP

Debezium reads from the WAL, which means zero impact on database query performance. It captures every change including DELETE operations. This is fundamentally different from timestamp-based polling which misses deletes and has latency.

MySQL

json
{
  "name": "mysql-source",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "${file:/secrets/mysql-password.txt:password}",
    "database.server.id": "1",
    "topic.prefix": "myapp",
    "database.include.list": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "include.schema.changes": "true"
  }
}

JDBC Source Connector

For databases where WAL access is not possible. Uses polling with either timestamp or incrementing column.

json
{
  "name": "jdbc-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/myapp",
    "connection.user": "reader",
    "connection.password": "${file:/secrets/jdbc-password.txt:password}",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "updated_at",
    "incrementing.column.name": "id",
    "table.whitelist": "orders,products",
    "topic.prefix": "jdbc-",
    "poll.interval.ms": "5000",
    "batch.max.rows": "1000",
    "transforms": "createKey",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id"
  }
}

WARNING

JDBC polling has higher latency than CDC (seconds vs milliseconds), misses hard deletes, and puts load on the source database. Use Debezium CDC whenever possible.


Sink Connectors

Elasticsearch Sink

json
{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "myapp.public.products",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "false",
    "behavior.on.null.values": "delete",
    "write.method": "upsert",
    "batch.size": "500",
    "max.buffered.records": "5000",
    "flush.timeout.ms": "10000",
    "max.retries": "5",
    "retry.backoff.ms": "1000"
  }
}

S3 Sink

json
{
  "name": "s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "myapp.public.events",
    "s3.bucket.name": "my-data-lake",
    "s3.region": "us-east-1",
    "s3.part.size": "5242880",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
    "partition.duration.ms": "3600000",
    "rotate.interval.ms": "600000",
    "flush.size": "10000",
    "locale": "en-US",
    "timezone": "UTC"
  }
}

JDBC Sink

json
{
  "name": "jdbc-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "myapp.public.users",
    "connection.url": "jdbc:postgresql://analytics-db:5432/warehouse",
    "connection.user": "writer",
    "connection.password": "${file:/secrets/sink-password.txt:password}",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "auto.create": "true",
    "auto.evolve": "true",
    "batch.size": "1000"
  }
}

Single Message Transforms (SMTs)

SMTs modify records in-flight without writing custom code.

Common Built-in SMTs

TransformDescription
ValueToKeyCopy fields from value to key
ExtractFieldExtract a single field from struct
ReplaceFieldInclude, exclude, or rename fields
MaskFieldReplace field value with null/default
InsertFieldAdd static or metadata fields
TimestampRouterRoute to topics based on timestamp
RegexRouterRoute to topics based on regex
FlattenFlatten nested structs
CastChange field types
HeaderFromMove fields to record headers
FilterDrop records based on conditions

SMT Configuration Examples

json
{
  "transforms": "route,unwrap,rename",

  "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.route.regex": "myapp\\.public\\.(.*)",
  "transforms.route.replacement": "$1",

  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.delete.handling.mode": "rewrite",
  "transforms.unwrap.add.fields": "op,source.ts_ms",

  "transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.rename.renames": "old_name:new_name,legacy_id:id"
}

TIP

The Debezium ExtractNewRecordState SMT is essential when using Debezium with sink connectors. Without it, the sink receives the full Debezium envelope (before/after/source/op), not the simple row data it expects.


Schema Registry Integration

Schema Registry manages Avro, Protobuf, and JSON schemas, ensuring producers and consumers agree on data format.

Converter Configuration

properties
# Avro (most common for Kafka Connect)
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

# Protobuf
value.converter=io.confluent.connect.protobuf.ProtobufConverter

# JSON with schema
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

# JSON without schema (not recommended for production)
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Compatibility Modes

ModeAllowed Changes
BACKWARDDelete fields, add optional fields (default)
FORWARDAdd fields, delete optional fields
FULLAdd/delete optional fields only
NONEAll changes allowed (dangerous)

Error Handling & Dead Letter Queues

json
{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "dlq-my-connector",
  "errors.deadletterqueue.topic.replication.factor": 3,
  "errors.deadletterqueue.context.headers.enable": true,
  "errors.retry.delay.max.ms": "60000",
  "errors.retry.timeout": "300000",
  "errors.log.enable": true,
  "errors.log.include.messages": true
}

WARNING

Setting errors.tolerance=all means failed records go to the DLQ instead of crashing the connector. This prevents data loss from one bad record blocking millions of good records. But you must monitor the DLQ topic and investigate failures.


Operational Patterns

Health Monitoring

bash
# Check cluster status
curl localhost:8083/ | jq

# Check all connectors
curl localhost:8083/connectors | jq

# Detailed status check script
for connector in $(curl -s localhost:8083/connectors | jq -r '.[]'); do
  status=$(curl -s "localhost:8083/connectors/$connector/status" | jq -r '.connector.state')
  tasks=$(curl -s "localhost:8083/connectors/$connector/status" | jq -r '.tasks[].state')
  echo "$connector: connector=$status tasks=$tasks"
done

Worker Configuration (Distributed Mode)

properties
# worker.properties
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter

# Internal topics (configure once, never change)
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

# REST API
rest.port=8083
rest.advertised.host.name=connect-worker-1

# Plugin path
plugin.path=/usr/share/confluent-hub-components,/usr/share/java

Common Issues & Fixes

IssueCauseFix
Connector FAILEDBad config, auth errorCheck logs, fix config, restart
Task FAILEDTransient error, schema changeRestart task, check DLQ
Consumer lag growingSlow sink, too few tasksIncrease tasks.max, optimize sink
Rebalancing stormsWorkers joining/leavingIncrease scheduled.rebalance.max.delay.ms
OOM errorsLarge records, too many tasksIncrease heap, reduce tasks.max per worker

Connector Ecosystem

CategoryPopular Connectors
RDBMSDebezium (PG, MySQL, SQL Server, Oracle), JDBC
NoSQLMongoDB, Cassandra, DynamoDB
SearchElasticsearch, OpenSearch, Solr
Cloud StorageS3, GCS, Azure Blob
Data WarehouseBigQuery, Snowflake, Redshift
MessagingJMS, RabbitMQ, ActiveMQ
FilesFileStream, Spooldir, SFTP
HTTPHTTP Source/Sink (REST APIs)

Last updated: 2026-03-20

"What I cannot create, I do not understand." — Richard Feynman