Directions
1. Let’s assume you have both topics on your cluster in Austin using Confluent Replicator:
- orders_cdc_austin
- orders_cdc_london
Both topics contain purchase order data with a simple format based on the example below:
{ "change_op": "insert", "order_id": 1, "product_id": 12, "product_loc": "aus" }
With KSQL, create streams on top of both topics using the JSON format:
ksql> CREATE STREAM orders_cdc_austin_stream (change_op VARCHAR, order_id INT, product_id INT, product_loc VARCHAR) WITH (KAFKA_TOPIC='orders_cdc_austin', VALUE_FORMAT='JSON'); Message ---------------- Stream created ---------------- ksql> CREATE STREAM orders_cdc_london_stream (change_op VARCHAR, order_id INT, product_id INT, product_loc VARCHAR) WITH (KAFKA_TOPIC='orders_cdc_london', VALUE_FORMAT='JSON'); Message ---------------- Stream created ----------------
2. Now, you can create a new stream from one of the existing streams with a new output topic. The example below uses the Austin stream:
ksql> CREATE STREAM orders_cdc_stream WITH (KAFKA_TOPIC='orders_cdc', VALUE_FORMAT='JSON') AS SELECT * FROM orders_cdc_austin_stream; Message ---------------- Stream created ----------------
3. At last, you can merge the London stream using the INSERT INTO
statement.
ksql> INSERT INTO orders_cdc_stream SELECT * FROM orders_cdc_london_stream; Message ------------------------------- Insert Into query is running. -------------------------------
Note: The schema and partitioning column produced by the query must match the stream’s schema and key, respectively. If the schema and partitioning column are incompatible with the stream, then the statement will return an error. KSQL tables are not supported in the FROM
clause of an INSERT INTO... SELECT FROM
statement.
The result is a stream and topic that has all of the purchase order events from both Austin and London available for consumption. With a few basic KSQL statements, you have created a stream processing application that unites like streams to suit your needs.