[Live Lab] Streaming 101: Hands-On with Kafka & Flink | Secure Your Spot
Whether you’re streaming change data capture (CDC) events from MongoDB to Apache Kafka® or sinking high-velocity data from Kafka into MongoDB for analytics, the following best practices ensure a secure, performant, and resilient architecture.
This technical deep dive covers implementing the MongoDB Atlas Source and Sink Connectors on Confluent Cloud. While the managed service provides zero-ops infrastructure, the logical implementation requires architectural precision to ensure data integrity, high throughput, and cost efficiency.
Before you configure a single connector, the physical and logical layout of your infrastructure must be optimized to minimize latency and costs.
Cross-region traffic traverses the public internet or expensive backbone links, introducing variable latency (jitter). In high-throughput scenarios (>10k msgs/sec), this latency compounds, causing consumer lag.
Intra-region traffic is typically free or significantly cheaper than inter-region or internet-gateway traffic. To take advantage of this, follow these two steps to use colocation to ensure the tightest loop for data replications:
Implement cloud-region affinity.
Run your MongoDB Atlas deployment and your Confluent Cloud cluster in the same cloud provider and region (e.g., AWS us-east-1).
Choose your connectivity model for integrating MongoDB Atlas and Confluent Cloud based on your security requirements, including workload, configuration, endpoints, and complexity.
Public vs Private Networking Between Confluent Cloud and MongoDB Atlas
Feature | Public Networking | Private Networking (Recommended) |
Workload | Dev/test or non-sensitive data | Production and sensitive data (personally identifiable information [PII], payment card industry data [PCI]) |
Configuration | Requires adding Confluent Cloud Egress IPs to the MongoDB Atlas IP Allowlist | Uses AWS PrivateLink, Azure Private Link, or Google Cloud Private Service Connect |
Endpoints | Traffic traverses public internet (encrypted) | Traffic never leaves the cloud provider's backbone |
Complexity | Low setup effort | Moderate setup (requires endpoint mapping) |
The most common cause of streaming pipeline failure is schema evolution conflict and data type friction.
By default, Schema Registry uses the TopicNameStrategy. It attempts to register the schema for every record in the topic under a single subject name: topicname-value (e.g., mongo-data-value), which means all MongoDB collections will map to one Kafka topic—an approach with significant downsides.
Best Practice = One Topic per Schema: When using strict formats such as Apache Avro™️, Protocol Buffers (Protobuf), or JSON (backed by Schema Registry), the standard recommendation is to map one MongoDB collection to one Kafka topic. This segregation ensures data hygiene, simplifies consumer logic, and works seamlessly with default configurations.
The Risk: If you map multiple collections with different structures to a single topic, they’ll register under the same subject in Schema Registry. If these schemas are not backward-compatible, the connector will fail.
com.company.Users
com.company.Orders
For example, if you map three different MongoDB collections (Users, Orders, Products) to a single Kafka topic (mongo-data), they’ll all attempt to register their schemas under mongo-data-value. Since these schemas are structurally different, registration will fail due to backward incompatibility.
Workaround: The restriction above is not absolute; it’s a byproduct of the default strategy. If you have a legitimate need to mix multiple collections into a single topic (multiplexing), you can bypass the conflict by changing the naming strategy to RecordNameStrategy.
Instead of deriving the subject name from the topic name, this strategy derives it from the record name (specifically, the fully qualified name of the Avro record or Protobuf message). For example, even if Users and Orders are in the same topic (mongo-data), Schema Registry creates separate subjects for them:
Multiple disparate schemas can coexist in a single topic because they’re validated against their own distinct histories in the registry, not against each other.
Production/Strict: Use Avro or Protobuf. They’re compact (binary) and enforce type safety.
Ad Hoc/Schemaless: Use JSON. It’s useful for rapid prototyping but risky for downstream consumers that expect consistent fields.
MongoDB is flexible; documents in the same collection may vary in structure (e.g., Document A has name and address; Document B adds phone; Document C adds pincode). In contrast, strict schema formats such as Avro generally require a fixed structure.
The Solution: To map variable structure in MongoDB documents to a single strict schema, define a superset schema that includes all possible fields but mark non-guaranteed fields as optional.
Implementation: In your Avro definition, use a union type of null and the data type, and always set a default value of null.
Scenario: Connector reads Document A (missing phone).
Result: The Avro record fills the phone field with null.
Scenario: Connector reads Document B (has phone).
Result: The Avro record fills the phone field with the value.
Example of Avro schema field definition:
If the fields are completely unpredictable (e.g., user-defined attributes where one user adds "hobby" and another adds "pet_name"), defining a specific field for each is impossible.
The solution is to stop treating these as schema fields and to treat them as data. You have two implementation paths: the Avro map and the JSON string envelope for structured versus ultimate flexibility, respectively.
Use this when your dynamic fields are flat (not deeply nested) and share a common type (usually string). This preserves the key-value structure in a way that is queryable by some downstream tools without parsing.
The Concept: Define a single field (e.g., attributes) that acts as a container for all dynamic keys.
The Trade-off: Avro maps enforce a single value type. You usually define the value as string. This means if a user sends an integer (age: 30), it must be cast to a string ("30") to fit the map.
A sample Avro schema definition:
Use this when your dynamic data is deeply nested or contains mixed complex types (e.g., arrays, sub-objects) that an Avro Map can’t handle.
The Concept: Serialize the entire variable section of the MongoDB document into a single JSON string.
The Trade-off: You lose all schema validation on the contents. The downstream consumer (e.g., Snowflake, BigQuery) is responsible for parsing this JSON string at read time. However, this is the most robust method for preventing pipeline failures due to unexpected data shapes.
A sample data flow example:
MongoDB source: {"name": "Widget", "meta": {"color": "red", "dims": [10, 20]}}
Kafka topic (JSON string strategy):
Recommendation:
If your downstream sink is a modern data warehouse (Snowflake, BigQuery, Databricks), use Option B (JSON string). These platforms have native VARIANT or JSON column types that can ingest this string and allow SQL querying directly into the JSON structure (e.g., SELECT meta_json:color FROM table).
MongoDB uses BSON types (e.g., objectId, decimal128, BSON timestamp) that don’t map 1:1 to Avro or Protobuf. This creates friction when downstream systems (like SQL databases) attempt to interpret the raw data.
The Problem: By default, the connector may serialize a MongoDB _id (objectId) as a complex structure: { "$oid": "507f1f77bcf86cd799439011" }. Downstream SQL sinks (such as Snowflake or Postgres) often expect a simple string and will fail to ingest this structure.
Best Practice: Use single message transforms (SMTs) like ExtractField or custom SMTs to flatten the _id structure into a simple string before it reaches the Kafka topic.
The MongoDB Source Connector outputs two distinct time fields, leading to frequent confusion:
BSON Timestamp (clusterTime): An internal MongoDB logical clock used for operational log (oplog) ordering. It’s a 64-bit value (high 32 bits are seconds since epoch; low 32 bits are an incrementing counter).
ts_ms (Transaction Time): The wall clock time (in milliseconds) when the event actually occurred on the database.
The Risk: Users often mistake the BSON timestamp for the event time. Because the BSON timestamp is a composite value (part time, part counter), treating it as a standard Unix epoch timestamp in stream processing engines (such as Kafka Streams or Apache Flink®) will result in incorrect windowing and data processing errors.
Best Practice: Explicitly use ts_ms (or a user-defined created_at field) for event-time processing and windowing. Ignore the BSON timestamp unless you’re specifically replicating oplog order logic.
Rarely does the raw document structure in MongoDB match the exact requirements of your downstream analytics platform. Instead of building a heavy Kafka Streams application for minor changes, use SMTs.
Flattening Nested Data: MongoDB documents are often deeply nested. Use the Flatten SMT to un-nest complex objects so they can be written to tabular sinks (e.g., BigQuery).
Field Masking/Renaming: Use ReplaceField to rename _id to id (if downstream systems reject underscores) or to mask PII fields before they enter the topic.
The sink connector projects Kafka records into MongoDB collections. The primary goal here is idempotency—ensuring that if Kafka re-sends a message (due to network blips), you don't end up with duplicate data in MongoDB.
To achieve exactly-once semantics in the database, you must align the Kafka key with the MongoDB _id.
doc.id.strategy: Set to ProvidedInValueStrategy or FullKeyStrategy to generate stable _id values based on your Kafka record.
write.strategy: Use upsert logic (ReplaceOneDefaultStrategy or UpdateOneBusinessKeyTimestampStrategy). If a record is retried, an insert strategy will fail (duplicate key error), whereas an upsert will simply overwrite the existing record with the same state.
If your Kafka topic contains CDC events (e.g., from a relational database via Debezium), standard inserts are not enough.
Configuration: Set cdc.handler to the appropriate handler for your source (e.g., com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler).
Result: This ensures that a DELETE event in Kafka actually removes the document in MongoDB rather than inserting a record saying "I was deleted."
tasks.max: Increase this to match the number of partitions in your Kafka topic to maximize parallelism.
max.batch.size: Default is usually sufficient, but for high throughput, increase to 1000 or 2000 to reduce network round trips.
The source connector watches the MongoDB oplog (via the Change Streams feature) and pushes changes to Kafka.
By default, a MongoDB Change Stream UPDATE event contains only the delta (what changed) and the _id. It does not contain the unchanged fields.
The Risk: If downstream consumers need the full context of the document (e.g., for enrichment or routing based on fields that didn't change), the default setting is insufficient.
Configuration: Set change.stream.full.document to updateLookup. This forces MongoDB to fetch the current state of the full document and attach it to the event.
Avoid dumping every change from a high-volume collection into Kafka if you need a subset only.
Configuration: Use the pipeline configuration to apply a standard MongoDB aggregation match. Example: [{"$match": {"fullDocument.region": "US"}}]
Result: This filters events before they leave MongoDB, significantly reducing Kafka storage costs and noise.
Avoid dumping all collections into one topic.
Configuration: Use topic.prefix (e.g., mongo.) and standard namespace mapping.
Result: A change in database Shop and collection Users becomes topic mongo.Shop.Users. This makes downstream consumption much cleaner.
Warning: Do not set error.tolerance = all on a source connector.
If a record fails to serialize (e.g., schema mismatch), this setting will drop the record silently. In a CDC context, missing an event (like an INSERT) compromises data integrity.
Building a production pipeline goes beyond valid configurations; it requires anticipating failure modes and enforcing strict access controls. Operational resilience ensures that your pipeline recovers gracefully from outages without data loss while security measures protect the data in transit and at rest. Neglecting these aspects can turn a simple network blip into a critical incident or a compliance violation.
The source connector relies on resume tokens stored in Kafka to know where to continue reading after a restart.
The Risk: If the connector is paused for longer than your MongoDB oplog retention window, the resume token will become invalid (the data it points to has "rolled off"). The connector will fail and require a snapshot reset.
Best Practice: Ensure that your MongoDB Oplog window is sized comfortably larger than your expected maximum downtime (e.g., 24–48 hours minimum).
Authentication: Never use personal user accounts. Create dedicated service accounts in both Confluent Cloud and MongoDB Atlas.
Least Privilege: The Kafka service account should have only WRITE access to specific topics. The MongoDB User should have only readWrite on the specific database, not atlasAdmin.
Encryption (Client-Side Field Level Encryption): Data is often encrypted before it reaches Kafka (by the source application) to ensure that the message bus never holds raw PII. In this architecture, the MongoDB Sink Connector uses client-side field level encryption to decrypt the data as it consumes it from the topic. This ensures that while the data remains secure in transit through Kafka, it’s restored to its readable (or queryable) format upon insertion into MongoDB, assuming the database environment is the trusted secure zone.
The following configuration templates synthesize the best practices covered in this guide. They move beyond the default settings to enforce idempotency, optimize network usage, and handle data shaping.
Use these as a baseline for your production deployment, ensuring that you replace the placeholder values with your specific service account credentials and host details.
This configuration handles _id renaming (for compatibility), ensures idempotency via upserts, and tunes batch sizes for high throughput:
Data Transformation (Optional): If you need to manipulate the doc (e.g., add a timestamp), use a post-processor chain.
This configuration ensures that full document context is preserved using updateLookup and filters noise at the source using a MongoDB aggregation pipeline to save costs:
Percent Encoding: If your MongoDB password contains special characters (such as @, :, /), you must URL-encode them in the connection string or password field (e.g., @ becomes %40).
Dead Letter Queue (DLQ) Management: The sink auto-creates a DLQ. Monitor this! If the DLQ fills up, it means your data is not landing in MongoDB. Common causes are schema validation failures or size limit transformation errors.
Service Record (SRV) vs. Standard:
Atlas: Must use the SRV string (mongodb+srv://...).
Self-Managed: Use standard host lists.
Time-Series: If sinking to a MongoDB time-series collection, ensure that you’re running MongoDB v5.0+ and that you’ve configured timeseries.timefield.
Integrating MongoDB Atlas with Confluent Cloud offers a powerful backbone for real-time data architectures, but "managed service" doesn’t mean "managed architecture." While it’s easy to get data moving between these two systems, true production readiness requires looking past the default configurations.
The difference between a fragile pipeline and a resilient architecture lies in the details covered in this guide.
Networking Precision: Minimizing latency through region affinity and PrivateLink
Data Integrity: Enforcing strict schemas and handling the nuances of BSON-to-Avro conversion
Resilience: Configuring for idempotency in the sink and full-document context in the source
By treating your connectors not just as pipes but as intelligent components that require tuning for logical and physical constraints, you’ll ensure that your data platform remains cost-efficient, accurate, and capable of scaling with your business.
Apache®, Apache Kafka®, Kafka®, Apache Flink®, Flink®, Apache Avro™️, and Avro™️ are registered trademarks of the Apache Software Foundation. No endorsement by the Apache Software Foundation is implied by the use of these marks.
Kafka client failover is hard. This post proposes a gateway‑orchestrated pattern: use Confluent Cloud Gateway plus Cluster Linking to reroute traffic, reverse replication, and enable one‑click failover/failback with minimal RTO.
Learn the difference between cloud API keys and resource-specific API keys in Confluent Cloud, plus best practices for service accounts and production security.