Show Me How: Build Streaming Data Pipelines for Real-Time Data Warehousing | Register Today
“Persistent” queries have historically formed the basis of ksqlDB applications, which continuously transform, enrich, aggregate, materialize, and join your Apache Kafka® data using a familiar SQL interface. ksqlDB continuously executes these queries, and their incremental results are pushed out to clients in real time. But what if you just want to look up the latest result of a materialized view, much like you would with a traditional database? We introduced “pull” queries into ksqlDB for precisely this need.
In contrast to persistent queries, pull queries follow a traditional request-response model. A pull query retrieves the latest result from a materialized view and then terminates upon completion. Together, these contrasting push/pull patterns provide a substantial amount of flexibility and ultimately enable a broad class of end-to-end stream processing workloads that can power your real-time applications.
While ksqlDB itself has supported pull queries for some time now, we are pleased to announce that pull queries are now available as a preview feature in Confluent Cloud ksqlDB. The remainder of this post will provide key information about using pull queries in Confluent Cloud, as well as describe the limitations pull queries will have while in preview status.
You may issue pull queries directly from the Confluent Cloud ksqlDB web interface with no additional configuration. But if you’d like to use pull queries from a CLI session (or via HTTP requests), you’ll need to perform some additional configuration to authorize your client to securely interact with your ksqlDB application.
If you don’t already have an application running, you can begin by provisioning one either from the UI or the CLI. Once your application is configured and operational, log in to Confluent Cloud using the ccloud CLI:
Now that you’re logged in, list your ksqlDB applications:
ccloud ksql app list
You’ll need to obtain three key pieces of information from the output of the above command:
endpoint): The endpoint will be used to connect to your application in a later step
Id): The application ID will be used to create an application-specific API key and secret that clients may use to connect to the application
Kafka): The ID of your Kafka cluster
Once you’ve obtained these values from the output, specify which Kafka cluster your CLI session should use:
ccloud kafka cluster use <kafka cluster id>
Next, obtain your Confluent Cloud service account ID to use for security configuration. You can find it in the
Id column of the following command’s output:
ccloud service-account list
After obtaining your service account ID, run the following command to grant the necessary topic permissions:
ccloud kafka acl create --allow --service-account <id> --operation READ --operation WRITE --operation CREATE --topic 'pq_' --prefix
Now we’ll create an application-specific API key and secret by passing the ksqlDB application ID as the
--resource parameter in the command below. Please note that these keys are different from Kafka API keys. A separate API key must be created for your specific ksqlDB application:
ccloud api-key create --resource <ksqldb application id>
You may now use your key and secret output by the above command to connect to your application using the ksqlDB CLI. The key is the username, and the secret is the password:
<path-to-confluent>/bin/ksql -u <api-key> -p <secret> <endpoint>
If you’d prefer not to install the ksqlDB CLI, you can also launch a CLI session using Docker:
docker run -it confluentinc/ksqldb-cli:0.12.0 ksql -u <api-key> -p <secret> <endpoint>
Using the ksqlDB CLI session that you opened in the previous step, we will now build an example workload that you may use to experiment with pull queries in Confluent Cloud. First, create an input stream:
CREATE STREAM pq_pageviews (user_id INTEGER KEY, url STRING, status INTEGER) WITH (kafka_topic='pq_pageviews', value_format='json', partitions=1);
Next, we will create a materialized view over this input stream. This materialized view aggregates events from the pageviews stream, grouped on the events’ url field:
CREATE TABLE pq_pageviews_metrics AS SELECT url, COUNT(*) AS num_views FROM pq_pageviews GROUP BY url EMIT CHANGES;
We will now populate this materialized view by writing events to the pageviews input stream:
INSERT INTO pq_pageviews (user_id, url, status) VALUES (0, 'https://confluent.io', 200); INSERT INTO pq_pageviews (user_id, url, status) VALUES (1, 'https://confluent.io/blog', 200);
Now that our materialized view contains some data, we can issue a pull query against it to retrieve the latest count for a given URL:
ksql> SELECT * FROM pq_pageviews_metrics WHERE url = 'https://confluent.io'; +-------------------------------------------------+-------------------------------------------------+ |URL |NUM_VIEWS | +-------------------------------------------------+-------------------------------------------------+ |https://confluent.io |1 |
This pull query should return precisely one row. Each time the given pull query is run, it will return the latest value for the targeted row.
While a feature is in preview, we recommend using it in production workloads at your own risk. The following are key limitations associated with pull queries in Confluent Cloud ksqlDB:
Pull queries are a simple but powerful feature that enable an entirely new class of use cases around materialized views. By combining this relatively traditional lookup capability with advanced stream processing technology, ksqlDB is capable of powering a broad range of modern, real-time applications.
As a final note, pull queries will be completely free of charge while in preview. Once pull queries reach generally available status, they will have consumption-based pricing associated with them, given that they consume bandwidth.
If you’d like to continue learning about pull queries in ksqlDB, we encourage you to head over to the documentation.
And if you’re ready to begin experimenting with pull queries in Confluent Cloud, our cloud quickstart guide is one of the easiest ways to get up and running.
An Approach to combining Change Data Capture (CDC) messages from a relational database into transactional messages using Kafka Streams.
Change data capture (CDC) converts all the changes that occur inside your database into events and publishes them to an event stream. You can then use these events to power analytics, drive operational use cases, hydrate databases, and more. The pattern is enjoying wider adoption than ever before.