Confluent OEM Program: Grow faster with enterprise-grade data streaming | Learn More
Data can originate in a number of different sources—transactional databases, mobile applications, external integrations, one-time scripts, etc.—but eventually it has to be synchronized to a central data warehouse for analysis and decision making. At Bolt, one of the leading ride-hailing apps in Europe and Africa, we use ksqlDB in our data warehouse stream processing pipeline to power every decision we make.
Having a central data warehouse allows analysts, scientists, developers, operation specialists, and other decision-makers to easily get the information they need by querying a single source of data. Warehouse users can compare numbers over different lines of business and time periods, as well as calculate aggregates and make predictions on top of it. In order to move data between systems and increase availability, we need data replication. Data replication is the process of copying data from one server to another. Ideally, a data replication pipeline should satisfy the following requirements:
We initially considered several different solutions and frameworks: ksqlDB, Apache Flink®, Apache Spark™, Apache Storm, Kafka Streams applications, all the way up to writing a replication service ourselves. ksqlDB had the honor of being a first-to-try candidate from that list because it is native to the Kafka ecosystem.
And it turned out to be a perfect fit for our use case. Let’s imagine we have a Kafka consumer reading a set of topics and ingesting data into a warehouse. In most of the cases, you do not want it to read source topics containing raw data, so you need data to be available in another topic. This is exactly what ksqlDB does for you. Here, you can see how easy it is to replicate data from source_topic to destination_topic, which in our case, is later ingested into data warehouse:
CREATE STREAM source_topic_stream ( ROWKEY BIGINT KEY, created_at VARCHAR, data VARCHAR, user_id BIGINT ) WITH ( kafka_topic = 'source_topic', value_format = 'json' );
CREATE STREAM destination_topic_stream WITH ( kafka_topic = 'destination_topic', value_format = 'json' ) AS SELECT rowkey, user_id, created FROM source_topic_stream EMIT CHANGES;
You can repeat this process for every topic whose data you want to propagate to a warehouse or any other system. At Bolt, we’ve found ksqlDB to be a particularly good fit for replicating all our data to a warehouse due to a number of reasons.
ksqlDB is essentially built on top of the Kafka consumer and producer protocols, providing all guarantees that native clients have. It is also extremely easy to integrate with Kafka.
ksqlDB is deployed as a separate cluster of machines completely decoupled from the main Kafka cluster. It supports two deployment modes (interactive and headless), both of which are quite easy to deploy, maintain, and scale.
ksqlDB has separate cluster deployments and is built on top of native Kafka consumers and producers, meaning it can easily scale out horizontally. You can add or remove nodes from a ksqlDB cluster without affecting the main Kafka brokers and without need to reconfigure anything.
Using the Apache Avro™ data format for your Kafka messages automatically creates “documentation” for them. Every topic has its schema defined and available for every other process to use. In the context of ksqlDB, this also prevents users from having to specify types for every field in a stream.
Instead of:
CREATE STREAM source_topic_stream ( ROWKEY BIGINT KEY, created_at VARCHAR, data VARCHAR, user_id BIGINT ) WITH ( kafka_topic = 'source_topic_name', value_format = 'json' );
…users can simply write:
CREATE STREAM source_topic_stream WITH ( kafka_topic = 'source_topic_name', value_format = 'avro', key = 'id' );
The ksqlDB engine will contact Schema Registry and resolve the given topic’s schema automatically.
You can explicitly specify fields you want to extract from each stream. This makes it easy to support and update streams:
CREATE STREAM source_topic_stream WITH ( kafka_topic = 'source_topic_name', value_format = 'avro', key = 'id' );
CREATE STREAM destination_topic_stream WITH ( kafka_topic = 'destination_topic_name', value_format = 'avro' ) AS SELECT id, created_at, updated_at, data FROM source_topic_stream EMIT CHANGES;
If you need any functionality not currently provided by ksqlDB runtime, you can write plain Java class and plug it into a ksqlDB engine.
import java.math.BigDecimal; import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter;
@UdfDescription(name = "get_city_for_coordinates", description = "Given latitude and longitude, returns name of a city those coordinates are located in, or N/A in case no matches found") public class GetCityForCoordinates {
@Udf(description = "Get city name from given latitude and longitude") public String getCityForCoordinates( @UdfParameter(description = "Latitude") final BigDecimal lat, @UdfParameter(description = "Longitude") final BigDecimal lng ) { // Some implementation logic here }
}
Then, you can use it later:
SELECT id, GET_CITY_FOR_COORDINATES(lat, lng) FROM source_stream ...;
You can easily filter your data:
CREATE STREAM source_topic_stream WITH ( kafka_topic = 'source_topic_name', value_format = 'avro', key = 'id' );
CREATE STREAM destination_topic_stream WITH ( kafka_topic = 'destination_topic_name', value_format = 'avro' ) AS SELECT id, created_at, data, amount FROM source_topic_stream WHERE amount <= 100 EMIT CHANGES;
In addition, you can replicate a single data source to several destinations based on distribution criteria:
CREATE STREAM customers_stream WITH ( kafka_topic = 'customers', value_format = 'avro', key = 'id' );
CREATE STREAM customers_male_stream WITH ( kafka_topic = 'customers_male', value_format = 'avro' ) AS SELECT id, created_at, age, name, customer_account_id FROM customers_stream WHERE gender = 'M' EMIT CHANGES;
CREATE STREAM customers_female_stream WITH ( kafka_topic = 'customers_female', value_format = 'avro' ) AS SELECT id, created_at, age, name, customer_account_id FROM customers_stream WHERE gender = 'F' EMIT CHANGES;
Or, you can merge several source data streams into a single destination stream:
CREATE STREAM purchases_online_stream WITH ( kafka_topic = 'purchases_online', value_format = 'avro', key = 'id' );
CREATE STREAM purchases_offline_stream WITH ( kafka_topic = 'purchases_offline', value_format = 'avro', key = 'id' );
CREATE STREAM purchases_total_stream WITH ( kafka_topic = 'purchases_total', value_format = 'avro' ) AS SELECT id, created_at, item_id, item_price, customer_id, 'online' as "purchase_channel" FROM purchases_online_stream EMIT CHANGES;
INSERT INTO purchases_total_stream SELECT id, created_at, item_id, item_price, customer_id, 'offline' as "purchase_channel" FROM purchases_offline_stream;
In this example, we have also added an artificial column that specifies which purchase channel was used (online or offline), making it possible to backtrace its source.
If a new topic needs to be replicated, there is no need to write a new Java-based Kafka Streams application, compile, and deploy it. ksqlDB does all this under the hood automatically. All you need to do is provide a new ksqlDB statement to the engine.
ksqlDB’s functionalities allow us to do pretty much any manipulation with data one can think of. Half a year ago, we migrated all replication dataflows to ksqlDB and never regretted having done it. As of the time of this writing, we are replicating over 500 topics to our warehouse, and this number is growing every day. All our use cases are essentially stateless at the moment, but we are also considering using ksqlDB for different types of stateful stream processing scenarios. In my next blog post, I will describe how we have adopted the Confluent Platform for our broader data replication pipeline.
Ready to check ksqlDB out? Head over to ksqldb.io to get started, where you can follow the quick start, read the docs, and learn more!
Versioned key-value state stores, introduced to Kafka Streams in 3.5, enhance stateful processing capabilities by allowing users to store multiple record versions per key, rather than only the single latest version per key as is the case for existing key-value stores today...
This blog post discusses the two generals problems, how it impacts message delivery guarantees, and how those guarantees would affect a futuristic technology such as teleportation.