Apache Kafka®️ 비용 절감 방법 및 최적의 비용 설계 안내 웨비나 | 자세히 알아보려면 지금 등록하세요

How to Transform Data and Manage Schemas in Kafka Connect Pipelines

작성자:

Quick Definition: In Kafka Connect, data transformation and schema management are the practices of shaping, securing, and validating records as they move between Kafka and external systems, ensuring consistency, compliance, and long-term compatibility

Effective data transformation and schema management in Apache Kafka® ensure data consistency, scalability, and maintainability. This post addresses some key queries related to using Kafka Connect effectively.

Kafka Connect is a robust tool for integrating Kafka with external systems. To fully harness its potential, it is essential to implement practices that ensure data consistency, security, enrichment, and seamless transformations. 

Ready to master data transformation and schema management? Take Schema Registry 101 on Confluent Developer and sign up for Confluent Cloud to get started for free.

How to Ensure Data Consistency When Integrating Apache Kafka® With External Systems

Data consistency ensures that data remains accurate, reliable, and synchronized across all systems. To ensure data consistency when integrating Kafka connectors with external systems, follow these best practices:

Use Kafka Connect’s Exactly-Once Semantics (EOS):

Use Kafka Connect with exactly-once semantics (EOS) when supported by the connectors to avoid duplicate or missed data. EOS ensures that each record is processed exactly once in the target system, even in the event of failures or retries. This is achieved by combining idempotent producers with transactional writes. Not all connectors support EOS. Make sure that the connector supports transactional writes and idempotency (e.g., JDBC Sink Connector, S3 Sink Connector).

Additionally, monitor the transactional state logs to ensure their health, as they are critical for maintaining exactly-once guarantees.

Leverage External Systems’ Transactional APIs

Many external systems (e.g., relational databases like PostgreSQL, messaging systems like RabbitMQ, or cloud services like Amazon S3) provide transactional APIs that support atomic operations. These APIs allow multiple operations to be grouped into a single transaction. If any operation fails, the entire transaction is rolled back, ensuring atomicity. For example, when integrating with a relational database, leverage database transactions to group multiple writes into a single atomic operation. This ensures consistency even if the connector or database encounters an error during processing.

Kafka Connect can be configured to use these transactional APIs when interacting with external systems. This ensures that:

  • Data is written to the external system only if the Kafka offset can be committed. 

  • The Kafka offset is committed only if the external system operation succeeds.

 You can follow and implement the following give guidelines while leveraging transactional APIs:

  1. Ensure that the Kafka Connect connector you use supports transactional APIs for the external system you’re integrating with.

  2. Test your pipeline under various failure scenarios (e.g., network outages, database failures) to ensure that transaction consistency is maintained.

  3. Set up monitoring and alerting to detect and respond to transaction failures promptly.

  4. Be mindful of the performance implications of using transactions. For example, long-running transactions can lead to resource contention.

  5. Clearly document how transactions are handled in your pipeline to make it easier for your team to troubleshoot and maintain.

Implement Idempotent Operations

Design connectors to write data idempotently, ensuring repeated operations produce the same result without side effects.

The following are some of the best practices while implementing idempotent and atomic operations for specific connectors:

  • For source connectors, ensure Kafka only consumes committed changes from the external system. Tools like Debezium can monitor database logs and provide transactional consistency.

  • For sink connectors, ensure idempotent writes using unique keys (e.g., primary keys, unique IDs). Also, use upsert modes where applicable.

Error Handling and Retries

Configure retries and error policies in the connector configurations to handle transient failures. Use dead-letter queues (DLQs) to capture problematic records for further analysis.

Implement a Two-Phase Commit (2PC) Strategy

Implementing a two-phase commit (2PC) strategy in a distributed system with Kafka ensures data consistency between Kafka and external systems, such as databases or APIs. ̌This strategy ensures atomicity, meaning that a transaction is either fully committed or entirely rolled back across all involved systems. 

How to Mask or Encrypt Sensitive Data in Connectors

Kafka connectors often handle sensitive data such as personally identifiable information (PII), financial details, or health records. Failure to secure this data can result in data breaches, regulatory penalties, and reputational damage.

Data masking obscures sensitive information while retaining the data's structural format—this is especially useful for testing and analytics. Encryption transforms data into a secure format that can only be decrypted using a specific key, which ensures end-to-end security during data transfer and storage.

When moving sensitive data via Kafka connectors, make sure ot:

  • Identify sensitive fields and decide whether masking or encryption is appropriate. Use masking for scenarios like development or analytics where real data is not required. Use encryption to secure sensitive data during transit and storage.

  • Create custom single message transforms (SMTs) to mask sensitive fields dynamically. For instance, you can mask credit card numbers or email addresses.

  • Encryption can be applied in two key areas:

  • Configure Kafka Connect to encrypt data at rest and in transit using SSL/TLS.

  • Encrypt data before storing it in Kafka topics or external systems. Use server-side encryption for cloud storage systems like Amazon S3.

How to Use Schema Registry With Kafka Connect to Manage and Validate Schemas

Using Schema Registry with Kafka Connect is a common setup to manage and enforce schemas for messages in Kafka topics. It ensures data compatibility, enabling seamless integration and interoperability across producers, consumers, and connectors. 

A diagram depicting how Schema Registry retrieves and updates schema metadata from Kafka client applications

Following these give best practices ensures smooth operations and long-term maintainability: 

  • Choose the right schema format. Schema Registry supports Avro, Protobuf, and JSON Schema. Select a format that suits your use case:

    • Avro: Compact binary format, ideal for high-throughput systems. Use Avro if compactness and performance are priorities.

    • Protobuf: Supports efficient serialization and broad adoption in microservices.

    • JSON Schema: Human-readable and widely understood but less compact. Use JSON Schema when human readability and simplicity matter.

  • Enforce schema compatibility. Define compatibility levels to manage schema evolution and prevent breaking changes:

    • BACKWARD: Allows new schemas to be used with older consumers.

    • FORWARD: Allows old schemas to be used with newer consumers.

    • FULL: Ensures both backward and forward compatibility.

    • Use FULL compatibility for critical systems where schema integrity is paramount.

  • Use Kafka Connect converters consistently. Configure Kafka Connect to use the same converters (Avro, Protobuf, or JSON Schema) as the producers and consumers. Misaligned converters can lead to serialization/deserialization errors.

  • Integrate schema validation into your CI/CD pipeline to catch compatibility issues early.

  • Set up dead-letter topics for Kafka Connect tasks to capture messages that fail schema validation or conversion.

Serialization and Schema Management With Kafka Connect

How to Enrich Data in Kafka Connect Pipelines

Data enrichment and format conversion are common use cases in Kafka Connect pipelines. Effective implementation ensures smooth data transformations and maintains data quality across the ecosystem. Here are the best practices:

Plan Data Enrichment Logic

Before implementing enrichment in Kafka Connect, clearly define the requirements:

  • Identify Enrichment Sources: Determine external systems or databases needed for enrichment (e.g., REST APIs, SQL databases, or in-memory caches).

  • Map Enrichment Fields: Define which fields need to be added, updated, or transformed.

  • Consider Latency: Ensure that enrichment logic does not introduce significant delays.

Use SMTs in Kafka Connect for lightweight enrichment. For complex enrichment scenarios requiring real-time processing it is suggested to use either Kafka Streams or ksqlDB.

Use SMTs for Data Enrichment:

Kafka Connect's single message transforms (SMTs) allow lightweight data enrichment and format adjustments directly within the pipeline.

  • Use SMTs like InsertField to add metadata fields such as timestamps or source identifiers. For example you can set the following configuration:

"transforms": "InsertSource",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "source",
"transforms.InsertSource.static.value": "sales-system"
  • Use custom connectors or preprocessors to perform lookups against external systems to enrich data before sending it to Kafka.

Optimize Data Format Conversion

Kafka Connect supports format conversions between JSON, Avro, Protobuf, and others via converters. 

Use Schema Registry to manage schemas. Prefer Avro, Protobuf, or JSON Schema to ensure schema validation and compatibility.

When converting data formats (e.g., JSON to Avro):

  • Use the same schema for source and target formats to avoid mismatches.

  • Enable schema validation in Schema Registry to enforce consistency.

  • Automate schema validation in CI/CD pipelines to catch issues early.

  • Use default values in schemas for optional fields to handle missing data gracefully.

How to Handle Data Format Conversions (JSON ↔ Avro ↔ Protobuf)

Kafka Connect provides built-in support for data transformation and schema management, making it easier to integrate various data sources and sinks into your Kafka-based architecture. Here's how you can handle it effectively:

Schema Management

Kafka Connect integrates with Schema Registry (from Confluent) to manage Avro, Protobuf, and JSON schemas, ensuring schema evolution and compatibility.

  • Enable Schema Registry in the connector configuration.

  • Ensure your schema is backward-compatible by following best practices like, adding new fields with default values, avoiding breaking changes (e.g., removing required fields), and using Schema Registry’s compatibility modes (Backward, Forward, Full).

  • Kafka Connect automatically registers schemas when using Avro or other schema-based formats.

Data Transformation Using Single Message Transforms (SMTs)

SMTs allow you to modify messages before they reach Kafka (source) or before they are written to a sink. 

How to Use Single Message Transforms in Kafka Connect

Some of the common SMTs include:

Filtering Messages: Drop messages based on field values.

"transforms": "filterNulls",
"transforms.filterNulls.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterNulls.condition": "$.field != null"

Masking PII Data: Redact sensitive fields.

"transforms": "MaskField",
"transforms.MaskField.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskField.fields": "creditCardNumber"

Renaming Fields: Change field names before writing to a sink.

"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "oldName:newName"

Converting Data Types: Convert a field from string to integer.

"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "field:int"

If built-in SMTs don’t meet your needs, you can write Custom SMTs in Java by implementing Transformation<R>.

How to Manage Schema Evolution Without Breaking Pipelines

Handling schema evolution in Kafka connectors is crucial for maintaining data consistency, compatibility, and ensuring smooth data pipelines as schemas evolve over time. Below are some best practices for managing schema evolution in Kafka connectors:

Enable Compatibility Modes:

Set Schema Registry compatibility modes (e.g., backward, forward, full) to manage schema evolution.

  • Backward Compatibility: New schemas can read data written with older schemas. This is useful when consumers are updated before producers.

  • Forward Compatibility: Old schemas can read data written with newer schemas. This is useful when producers are updated before consumers.

  • Full Compatibility: Combines backward and forward compatibility, ensuring no breaking changes in either direction.

  • None: No compatibility checks are enforced. Use this only if you have strict control over schema updates.

Graceful Schema Evolution:

Update schemas incrementally by adding optional fields first and ensuring older schemas remain valid.

  • Prefer adding new fields rather than modifying or deleting existing ones. This minimizes the risk of breaking compatibility.

  • Always provide default values for new fields to ensure backward compatibility.

  • Mark fields as deprecated instead of deleting them. This gives consumers time to migrate away from the old fields.

Connector Updates:

Ensure that connectors and downstream systems are compatible with the updated schema versions.

  • Use appropriate converters (e.g., AvroConverter, JsonConverter) in Kafka Connect to handle serialization and deserialization.

  • Use SMTs to transform data on the fly, such as renaming fields or adding default values.

Configure a DLQ to handle messages that fail schema validation or deserialization.

How to Handle Errors, Retries, and Dead Letter Queues in Connectors

Robust error handling is essential for building resilient and reliable data pipelines. In Kafka Connect, failures can occur due to various reasons, such as network issues, malformed data, or downstream system unavailability. A proper strategy for managing retries and handling failed messages is crucial to prevent data loss and ensure pipeline continuity.

To effectively manage errors, follow these best practices:

  • Configure Error Tolerance: Kafka Connect provides a tolerance level for errors that occur during processing.

    • Set errors.tolerance = 'none' (the default) to have the connector task fail immediately upon encountering an error. This approach is suitable for use cases where any single failure is considered critical and requires immediate intervention.

    • Set errors.tolerance = 'all' to allow the connector to skip problematic messages and continue processing. This setting is required to enable the use of a dead letter queue.

  • Implement Retries for Transient Errors: Some errors are temporary and can be resolved by retrying the operation. Configure the connector to automatically retry these transient failures.

    • errors.retry.timeout: Set the maximum duration (in milliseconds) for which the connector will attempt to retry a failed operation. A value of 0 disables retries.

    • errors.retry.delay.max.ms: Define the maximum delay between consecutive retry attempts, often used with exponential backoff to avoid overwhelming a struggling external system.

  • Use a Dead-Letter Queue (DLQ) for Failed Messages: When errors.tolerance is set to 'all', messages that continuously fail processing (even after retries) can be routed to a designated Kafka topic known as a Dead Letter Queue (DLQ).

    • This prevents a single bad record from halting the entire data pipeline.

    • It isolates problematic messages for later inspection, debugging, and potential reprocessing without losing the data.

  • Enable and Configure the DLQ: To use a DLQ, you must configure it in your connector properties.

    • errors.deadletterqueue.topic.name: Specify the name of the Kafka topic where failed messages will be sent.

    • errors.log.enable=true: Log connector errors to the standard Kafka Connect logs.

    • errors.log.include.messages=true: Include the full content of the failed message in the logs. Be cautious with this setting if the data is sensitive.

  • Monitor and Process the DLQ: A DLQ is not a final destination. It is crucial to monitor this topic for incoming messages. Set up alerts to notify your team when a message is sent to the DLQ. Once a message is in the DLQ, you should analyze the error, fix the underlying issue (e.g., correct the data, update a schema), and decide whether to reprocess the message or discard it.

Get Started With Confluent

By following these best practices, you can ensure robust and secure data integration with Kafka Connect. Effective data consistency mechanisms, secure configurations, and schema management are critical for seamless and reliable integration with external systems. Get started for free on Confluent Cloud to put these best practices to the test.

Data Transformation and Schema Management FAQs

What’s the best way to ensure consistency when writing data to external systems? Use exactly-once semantics (when supported), idempotent writes, and external system transactional APIs. Always configure retries and use dead letter queues (DLQs) to capture problematic events without disrupting pipelines.

How can I secure sensitive data in Kafka Connect? Mask PII fields with single message transforms (SMTs) for non-production use cases, and encrypt data in transit with TLS. For sinks like Amazon S3, enable server-side encryption. This ensures data is both protected and compliant.

How do I manage schema evolution in Kafka Connect without breaking consumers?

Enable Schema Registry compatibility modes (Backward, Forward, or Full), add new fields with default values, and mark fields as deprecated before removing them. Test schema changes in staging before rollout.

When should I enrich or reformat data in Kafka Connect vs. Kafka Streams?

Use Connect SMTs for lightweight enrichment (timestamps, metadata, field renaming). For lookups, joins, or complex enrichment, use Apache Flink® or Kafka Streams, since Connect is not designed for heavy traffic.


Apache®, Apache Kafka®, Kafka®, Apache Flink®, Flink®, and the Kafka and Flink logos are registered trademarks of the Apache Software Foundation. No endorsement by the Apache Software Foundation is implied by the use of these marks. 

  • This blog was a collaborative effort between multiple Confluent employees.

이 블로그 게시물이 마음에 드셨나요? 지금 공유해 주세요.