Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
At the first Current conference in October 2022, Confluent announced Stream Designer, a visual builder for streaming data pipelines. In the Confluent Cloud console, you can construct and run a pipeline in minutes that consumes real-time events from source connectors, processes them with SQL queries, and sends them to sink connectors and Apache Kafka® topics. Stream Designer enables the ability to leverage built-in integrations with fully managed connectors, ksqlDB, and Kafka topics. It gives you speed, simplicity, and flexibility, allowing you to switch seamlessly between a graphical canvas and a full SQL editor.
Now, with the latest release of Confluent Cloud, you can create and update pipelines with SQL source code by using the Confluent CLI and the Pipelines REST API. Stream Designer also automatically detects and securely stores passwords and secrets used in connector ksqlDB statements updated through the UI, CLI, and API, and updates the statements to use variables that can be updated and managed separately from the SQL code. This process also applies to existing pipelines, and no additional user intervention is required.
Here’s how you can enhance the visual design experience with these powerful new features.
The following steps show how to create and update a simple pipeline that has Datagen connectors and corresponding Kafka topics. You’ll see how to manage secrets in source code and in the Confluent CLI. Finally, you’ll use the Pipelines REST API to manage the pipeline.
To start creating pipelines, ensure that your local environment is set up correctly.
Request authorization from a person with OrganizationAdmin
access for creating and modifying pipelines in your Confluent Cloud environment.
Run the confluent update
command to get the latest version of the CLI. You need version 2.38.0
or later to create and update pipelines from SQL files.
Your environment must have Schema Registry and ksqlDB clusters provisioned. Ensure that the SR cluster is provisioned before creating the ksqlDB cluster.
Log in to Confluent Cloud by using the confluent login
command.
The pipeline CLI commands require the ID of the Kafka cluster that hosts your pipelines. Run the following command to list the available Kafka clusters:
Your output should resemble:
Find the cluster you want to use to create pipelines. This example uses the lkc-a1b2c3
cluster. For convenience, assign the cluster ID to an environment variable:
Run the confluent kafka cluster use
command to ensure that it’s the active cluster:
Your output should resemble:
The pipeline create
command requires the ID of an associated ksqlDB cluster. Run the following command to list the available ksqlDB clusters:
Your output should resemble:
This example uses the lksqlc-j1k2l3
cluster. Find the cluster you want to use to create pipelines, and assign its ID to an environment variable:
Stream Designer encrypts secrets and stores them securely. This example pipeline uses a connector that requires an API key to access topics in the Kafka cluster.
Run the following command to create an API key and secret for authenticating the connector to the Kafka cluster:
Your output should resemble:
For convenience, assign the API key and secret to environment variables:
Your environment is ready to create pipelines.
With Confluent CLI version 2.38.0
and later, you can create a complete pipeline from a SQL file.
Copy the following SQL into a file named simple_pipeline.sql. The code declares a pipeline that has a Datagen source connector, a Kafka topic named pageviews-topic
, and a stream named pageviews-stream
.
Here are some interesting points to note in this code example:
The CREATE SOURCE CONNECTOR
statement declares a DatagenSource
connector named pageviews-source
. When activated, it produces pageviews data in JSON_SR
format to a new Kafka topic, named pageviews-topic
.
The connector must authorize to the Kafka cluster, so the CREATE SOURCE CONNECTOR
statement sets the kafka.api.key
and kafka.api.secret
properties.
The kafka.api.key
and kafka.api.secret
properties get values from the PAGEVIEWS_SOURCE_KAFKA_API_KEY
and PAGEVIEWS_SOURCE_KAFKA_API_SECRET
variables, which Stream Designer tracks by name.
Values for the PAGEVIEWS_SOURCE_KAFKA_API_KEY
and PAGEVIEWS_SOURCE_KAFKA_API_SECRET
variables are assigned later by the confluent pipeline create
command.
Run the following command to create the pipeline from the source code file. The --secret
option enables passing named secret values to Stream Designer.
Your output should resemble:
Note the ID of the pipeline, which in this example is pipe-gWXzV4
. For convenience, assign it to an environment variable:
Your new pipeline has been created and is available in Stream Designer for further updates. Currently, it’s in the draft
state.
Before you can activate the pipeline, you need to grant it activation privileges. Granting a pipeline privileges enables it to provision connectors, topics, queries, and other resources in Confluent Cloud. Only a user that is an Org Admin can grant privileges.
Run the following update
command to grant activation privileges to your pipeline:
Now that the pipeline has privileges, you can activate it. Activating the pipeline provisions the connector, creates the Kafka topic the connector produces to, and registers a stream on the topic.
Run the following command to activate your pipeline:
Your output should resemble:
The named secrets are displayed once the pipeline is activated.
Open the Confluent Cloud console to view the pipeline you created. In the navigation menu, click Stream Designer, and in the pipelines list, click test-pipeline.
On the design surface, you can see the connector, the topic, and the stream components. Hover over the connector and the topic to see a summary of messages that the connector has produced to the topic.
In the following steps you add a filter and another stream.
To modify the pipeline topology, you must first deactivate the pipeline.
In the simple_pipeline.sql
file, append the following SQL:
Save the file and run the following command to update the pipeline.
Now activate the modified pipeline.
Go back to the Confluent Cloud console to view the updated pipeline.
Hover over the filtered_pageviews_topic component to see a summary of the messages the topic has received.
The Cloud Console enables viewing and modifying the secrets that your pipeline uses. Click the Settings icon, and in the PIpeline Settings page, click Secrets.
The Confluent CLI provides more options for managing your pipelines. For more information, see Manage pipeline life cycle by using the Confluent CLI.
In addition to the Confluent CLI, you can manage your pipelines programmatically by using the Pipelines REST API. The following steps show how to use curl
to send requests to Confluent Cloud and jq
to format the JSON responses.
Requests to the Pipelines API require the identifier for your environment, in addition to the Kafka cluster ID.
Run the following command to list your environments:
Your output should resemble:
For convenience, assign the environment ID to an environment variable:
To create a pipeline, you need the identifiers of your ksqlDB and Stream Governance clusters, in addition to the environment and Kafka cluster IDs. The ksqlDB cluster runs your SQL logic, and the Stream Governance cluster hosts Schema Registry.
Run the following command to get the ID of your Stream Governance cluster:
Your output should resemble:
Note the Stream Governance cluster ID. In the current example, the cluster ID is lsrc-a1b2c
.
You need a Confluent Cloud API key to authenticate with the Confluent Cloud API. Every request to the Pipelines API must include the key and secret encoded in Base64.
Run the following command to get a Confluent Cloud API key:
Your output should resemble:
Run the following command to encode the API key and secret as a Base64 string. The -w 0 option is required to prevent the Base64 command from inserting a line break. The output is assigned to an environment variable named TOKEN
.
Run the following command to inspect the token:
Your output should resemble:
Your environment is ready for sending requests to the Pipelines API.
The pipeline description file contains the details of your pipeline, including metadata and SQL source code, in JSON format.
Run the following command to get details about the pipeline and save them to a local file:
The command sends a GET request to the API endpoint, pipes the response to jq
for formatting, and directs the output to a JSON file.
You can change the pipeline’s metadata or SQL source code in a description file and upload the whole file to Stream Designer. Also, you can upload a patch file that contains only the fields you want to modify.
Run the following command to deactivate the pipeline and return it to draft mode:
Create a file named patch.json and copy the JSON for the values you want to change into the file. For example, the following JSON enables patching the pipeline description. The environment and Kafka cluster IDs are required.
Save the file and run the following command to patch the pipeline with the updated description:
Run the following command to verify the patch. The curl
command sends a GET request to the Pipelines API and jq
selects the description
field from the JSON response.
Your output should resemble:
To keep your pipelines secure, you may need to rotate secrets periodically. The Pipelines API enables secret rotation by using a patch file
Run the following command to deactivate the pipeline and return it to draft mode:
Run the following command to create a new API key and secret for authenticating the connector to the Kafka cluster:
Note the API key and secret and store them in a secure location.
Create a new file named rotate.json and copy the following JSON into it. Assign the new API key and secret to the named secret you used previously.
Save the file and run the following command to patch the pipeline with the rotated secret. The curl
command sends a PATCH request to the Pipelines API and jq
selects the secrets
field from the JSON response.
Your output should resemble:
The named secret values are displayed in the returned JSON.
Reactivate the pipeline to use the new secret. Reactivating updates the connector configuration.
With the Pipelines API, you can create a new pipeline from source code in a description file.
A pipeline description file is a JSON file that specifies the environment, metadata, and SQL code for your pipeline.
Create a file named test_pipeline.json and copy the following JSON into it. The SQL defines a Kafka topic and registers a stream on the topic. Copy your values for env-id
, kafka-cluster-id
, ksqldb-cluster-id
, and sg-cluster-id
.
Save the file and run the following command to create the pipeline. Ensure that the TOKEN
environment variable is assigned with your value.
Your output should resemble:
Your pipeline is created and ready to edit in Stream Designer. Note the pipeline ID, because you need it for other requests to manage the pipeline. In the current example, the pipeline ID is pipe-1a2b3c
.
For convenience, assign the ID value to an environment variable.
You can activate the pipeline by using the confluent pipeline activate
command, but you can also activate the pipeline by sending a PATCH request.
Create a file named activate.json and copy the following JSON that sets the activation_privilege
and activated
properties to true
.
Run the following command to activate the pipeline with a PATCH request:
You can deactivate the pipeline and return it to the draft
state by using the confluent pipeline deactivate
command, but you can also deactivate the pipeline by sending a PATCH request.
Deactivate your pipeline by setting the activated
property to false
in a patch file.
Create a file named deactivate.json and copy the following JSON that sets the activated
property to false
.
Save the file and run the following command to deactivate the pipeline with a PATCH request.
You can do much more with the Pipelines API. For more information, see Manage pipeline life cycle by using the Confluent Cloud REST API.
Stream Designer enables you to build pipelines quickly and easily use a visual canvas, and now you have the ability to create and manage your pipelines with the Confluent CLI and the Pipelines REST API. Have fun building your streaming data applications!
To learn more, you can check out Confluent Cloud, a fully managed event streaming service based on Apache Kafka.
Announcing the latest updates to Confluent’s cloud-native data streaming platform, centralized identity management, enhanced RBAC, Client Quotas, and more.
Data pipelines are critical to the modern, data-driven business, connecting a network of data systems and applications to power both operational and analytical use cases. With the need to promptly […]