Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Create Stream Designer Pipelines with the Confluent CLI and Pipelines API

Written By

At the first Current conference in October 2022, Confluent announced Stream Designer, a visual builder for streaming data pipelines. In the Confluent Cloud console, you can construct and run a pipeline in minutes that consumes real-time events from source connectors, processes them with SQL queries, and sends them to sink connectors and Apache Kafka® topics. Stream Designer enables the ability to leverage built-in integrations with fully managed connectors, ksqlDB, and Kafka topics. It gives you speed, simplicity, and flexibility, allowing you to switch seamlessly between a graphical canvas and a full SQL editor.

Now, with the latest release of Confluent Cloud, you can create and update pipelines with SQL source code by using the Confluent CLI and the Pipelines REST API. Stream Designer also automatically detects and securely stores passwords and secrets used in connector ksqlDB statements updated through the UI, CLI, and API, and updates the statements to use variables that can be updated and managed separately from the SQL code. This process also applies to existing pipelines, and no additional user intervention is required.

Here’s how you can enhance the visual design experience with these powerful new features.

The following steps show how to create and update a simple pipeline that has Datagen connectors and corresponding Kafka topics. You’ll see how to manage secrets in source code and in the Confluent CLI. Finally, you’ll use the Pipelines REST API to manage the pipeline.

Set up your environment

To start creating pipelines, ensure that your local environment is set up correctly.

  • Request authorization from a person with OrganizationAdmin access for creating and modifying pipelines in your Confluent Cloud environment.

  • Run the confluent update command to get the latest version of the CLI. You need version 2.38.0 or later to create and update pipelines from SQL files.

  • Your environment must have Schema Registry and ksqlDB clusters provisioned. Ensure that the SR cluster is provisioned before creating the ksqlDB cluster.

  • Log in to Confluent Cloud by using the confluent login command.

Get your Kafka cluster ID

The pipeline CLI commands require the ID of the Kafka cluster that hosts your pipelines. Run the following command to list the available Kafka clusters:

confluent kafka cluster list

Your output should resemble:

       Id      |           Name            |   Type    | Provider |  Region   | Availability | Status
---------------+---------------------------+-----------+----------+-----------+--------------+---------
  * lkc-a1b2c3 | StreamDesigner_Shared_AWS | DEDICATED | aws      | us-west-2 | multi-zone   | UP
    lkc-d4e5f6 | tl_cluster_no_resources   | BASIC     | aws      | us-west-2 | single-zone  | UP

Find the cluster you want to use to create pipelines. This example uses the lkc-a1b2c3 cluster. For convenience, assign the cluster ID to an environment variable:

CLUSTER_ID=lkc-a1b2c3

Run the confluent kafka cluster use command to ensure that it’s the active cluster:

confluent kafka cluster use ${CLUSTER_ID}

Your output should resemble:

Set Kafka cluster "lkc-a1b2c3" as the active cluster for environment "env-o7p8q".

Get the ksqlDB cluster ID

The pipeline create command requires the ID of an associated ksqlDB cluster. Run the following command to list the available ksqlDB clusters:

confluent ksql cluster list

Your output should resemble:

       ID       |          Name          | Topic Prefix  |   Kafka    | Storage |                         Endpoint                          | Status | Detailed Processing Log
----------------+------------------------+---------------+------------+---------+-----------------------------------------------------------+--------+--------------------------
  lksqlc-j1k2l3 | ksqlDB_cluster_sd_test | pksqlc-g7h8i9 | lkc-a1b2c3 |      25 | https://pksqlc-g7h8i9.us-west-2.aws.devel.cpdev.cloud:443 | UP     | true

This example uses the lksqlc-j1k2l3 cluster. Find the cluster you want to use to create pipelines, and assign its ID to an environment variable:

KSQLDB_ID=lksqlc-j1k2l3

Generate a Kafka cluster API key and secret

Stream Designer encrypts secrets and stores them securely. This example pipeline uses a connector that requires an API key to access topics in the Kafka cluster.

Run the following command to create an API key and secret for authenticating the connector to the Kafka cluster:

confluent api-key create --resource ${CLUSTER_ID}

Your output should resemble:

It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | A1B2C3D4E5F6G7H8                                                 |
| Secret  | tyfjB0B2wcD0BBBsY12fwWEqbhfQfXr1mfvq4VuXeCfSXR0zL0O71+MPWHe2bgMR |
+---------+------------------------------------------------------------------+

For convenience, assign the API key and secret to environment variables:

KAFKA_API_KEY=A1B2C3D4E5F6G7H8
KAFKA_API_SECRET=tyfjB0B2wcD0BBBsY12fwWEqbhfQfXr1mfvq4VuXeCfSXR0zL0O71+MPWHe2bgMR

Your environment is ready to create pipelines.

Create a pipeline from SQL source code

With Confluent CLI version 2.38.0 and later, you can create a complete pipeline from a SQL file.

Copy the following SQL into a file named simple_pipeline.sql. The code declares a pipeline that has a Datagen source connector, a Kafka topic named pageviews-topic, and a stream named pageviews-stream.

CREATE SOURCE CONNECTOR "pageviews-source" WITH (
  "connector.class"='DatagenSource',
  "kafka.api.key"='${PAGEVIEWS_SOURCE_KAFKA_API_KEY}',
  "kafka.api.secret"='${PAGEVIEWS_SOURCE_KAFKA_API_SECRET}',
  "kafka.auth.mode"='KAFKA_API_KEY',
  "kafka.topic"='pageviews-topic',
  "output.data.format"='JSON_SR',
  "quickstart"='PAGEVIEWS',
  "tasks.max"='1'
);

CREATE OR REPLACE STREAM "pageviews-stream" (PAGEID VARCHAR, USERID VARCHAR, VIEWTIME BIGINT)
  WITH (kafka_topic='pageviews-topic', partitions=1, key_format='KAFKA', value_format='JSON_SR');

Here are some interesting points to note in this code example:

  • The CREATE SOURCE CONNECTOR statement declares a DatagenSource connector named pageviews-source. When activated, it produces pageviews data in JSON_SR format to a new Kafka topic, named pageviews-topic.

  • The connector must authorize to the Kafka cluster, so the CREATE SOURCE CONNECTOR statement sets the kafka.api.key and kafka.api.secret properties.

  • The kafka.api.key and kafka.api.secret properties get values from the PAGEVIEWS_SOURCE_KAFKA_API_KEY and PAGEVIEWS_SOURCE_KAFKA_API_SECRET variables, which Stream Designer tracks by name.

  • Values for the PAGEVIEWS_SOURCE_KAFKA_API_KEY and PAGEVIEWS_SOURCE_KAFKA_API_SECRET variables are assigned later by the confluent pipeline create command.

Run the following command to create the pipeline from the source code file. The --secret option enables passing named secret values to Stream Designer.

confluent pipeline create \
  --ksql-cluster ${KSQLDB_ID} \
  --name "test-pipeline" \
  --description "simple pipeline" \
  --secret PAGEVIEWS_SOURCE_KAFKA_API_KEY=${KAFKA_API_KEY} \
  --secret PAGEVIEWS_SOURCE_KAFKA_API_SECRET=${KAFKA_API_SECRET} \
  --sql-file simple_pipeline.sql

Your output should resemble:

+----------------------+---------------------------------------+
| ID                   | pipe-gWXzV4                           |
| Name                 | test-pipeline                         |
| Description          | simple pipeline                       |
| KSQL Cluster         | lksqlc-j1k2l3                         |
| Secret Names         | [PAGEVIEWS_SOURCE_KAFKA_API_KEY       |
|                      | PAGEVIEWS_SOURCE_KAFKA_API_SECRET]    |
| Activation Privilege | false                                 |
| State                | draft                                 |
| Created At           | 2023-01-25 17:12:43.256538908         |
|                      | +0000 UTC                             |
| Updated At           | 2023-01-25 17:12:43.466591788         |
|                      | +0000 UTC                             |
+----------------------+---------------------------------------+

Note the ID of the pipeline, which in this example is pipe-gWXzV4. For convenience, assign it to an environment variable:

PIPE_ID=pipe-gWXzV4

Your new pipeline has been created and is available in Stream Designer for further updates. Currently, it’s in the draft state.

Activate your pipeline

Before you can activate the pipeline, you need to grant it activation privileges. Granting a pipeline privileges enables it to provision connectors, topics, queries, and other resources in Confluent Cloud. Only a user that is an Org Admin can grant privileges.

Run the following update command to grant activation privileges to your pipeline:

confluent pipeline update ${PIPE_ID} --activation-privilege=true

Now that the pipeline has privileges, you can activate it. Activating the pipeline provisions the connector, creates the Kafka topic the connector produces to, and registers a stream on the topic.

Run the following command to activate your pipeline:

confluent pipeline activate ${PIPE_ID}

Your output should resemble:

+----------------------+---------------------------------------+
| ID                   | pipe-gWXzV4                           |
| Name                 | test-pipeline                         |
| Description          | simple pipeline                       |
| KSQL Cluster         | lksqlc-j1k2l3                         |
| Secret Names         | [PAGEVIEWS_SOURCE_KAFKA_API_KEY       |
|                      | PAGEVIEWS_SOURCE_KAFKA_API_SECRET]    |
| Activation Privilege | true                                  |
| State                | activating                            |
| Created At           | 2023-01-25 17:12:43.256538908         |
|                      | +0000 UTC                             |
| Updated At           | 2023-01-25 17:12:43.466591788         |
|                      | +0000 UTC                             |
+----------------------+---------------------------------------+

The named secrets are displayed once the pipeline is activated.

Open the Confluent Cloud console to view the pipeline you created. In the navigation menu, click Stream Designer, and in the pipelines list, click test-pipeline.

On the design surface, you can see the connector, the topic, and the stream components. Hover over the connector and the topic to see a summary of messages that the connector has produced to the topic. 

Modify your pipeline

In the following steps you add a filter and another stream.

To modify the pipeline topology, you must first deactivate the pipeline.

confluent pipeline deactivate ${PIPE_ID}

In the simple_pipeline.sql file, append the following SQL: 

CREATE OR REPLACE STREAM "filtered_pageviews"
  WITH (kafka_topic='filtered_pageviews_topic', partitions=1)
  AS SELECT * FROM "pageviews-stream" WHERE userid = 'User_9';

Save the file and run the following command to update the pipeline.

confluent pipeline update ${PIPE_ID} \
  --description "updated simple pipeline" \
  --sql-file simple_pipeline.sql

Now activate the modified pipeline.

confluent pipeline activate ${PIPE_ID}

Go back to the Confluent Cloud console to view the updated pipeline.

Hover over the filtered_pageviews_topic component to see a summary of the messages the topic has received.

The Cloud Console enables viewing and modifying the secrets that your pipeline uses. Click the Settings icon, and in the PIpeline Settings page, click Secrets

The Confluent CLI provides more options for managing your pipelines. For more information, see Manage pipeline life cycle by using the Confluent CLI.

Manage your pipeline with the REST API

In addition to the Confluent CLI, you can manage your pipelines programmatically by using the Pipelines REST API. The following steps show how to use curl to send requests to Confluent Cloud and jq to format the JSON responses.

Get the environment ID

Requests to the Pipelines API require the identifier for your environment, in addition to the Kafka cluster ID.

Run the following command to list your environments:

confluent env list

Your output should resemble:

       ID      |      Name
---------------+------------------
  * env-a1b3c  | default
    env-def4g5 | test_env
    env-hi6jk7 | dev

For convenience, assign the environment ID to an environment variable:

ENV_ID=env-a1b3c

Get the ksqlDB and Stream Governance cluster IDs

To create a pipeline, you need the identifiers of your ksqlDB and Stream Governance clusters, in addition to the environment and Kafka cluster IDs. The ksqlDB cluster runs your SQL logic, and the Stream Governance cluster hosts Schema Registry. 

Run the following command to get the ID of your Stream Governance cluster:

confluent schema-registry cluster describe

Your output should resemble:

+-------------------------+----------------------------------------------------+
        | Name                    | Stream Governance Package                          |
        | Cluster ID              | lsrc-0m2q5                                         |
        | Endpoint URL            | https://psrc-a1b2c.us-central1.gcp.confluent.cloud |
        | Used Schemas            |                                                 21 |
        | Available Schemas       |                                              19979 |
        | Free Schemas Limit      |                                              20000 |
        | Global Compatibility    | <Requires API Key>                                 |
        | Mode                    | <Requires API Key>                                 |
        | Service Provider        | gcp                                                |
        | Service Provider Region | us-central1                                        |
        | Package                 | advanced                                           |
        +-------------------------+----------------------------------------------------+

Note the Stream Governance cluster ID. In the current example, the cluster ID is lsrc-a1b2c.

Get an authentication token

You need a Confluent Cloud API key to authenticate with the Confluent Cloud API. Every request to the Pipelines API must include the key and secret encoded in Base64.

Run the following command to get a Confluent Cloud API key:

confluent api-key create --resource cloud

Your output should resemble:

+---------+------------------------------------------------------------------+
| API Key | ABCDEFGHIJKLMNOP                                                 |
| Secret  | CvB0BPfD0BBsdBJyVUWH3zmSt/dmEFmF7EdDJ5Y9S36RXQuskQlwOnpBhfC4SMNu |
+---------+------------------------------------------------------------------+

Run the following command to encode the API key and secret as a Base64 string. The -w 0 option is required to prevent the Base64 command from inserting a line break. The output is assigned to an environment variable named TOKEN

TOKEN=`echo -n "ABCDEFGHIJKLMNOP:CvB0BPfD0BBsdBJyVUWH3zmSt/dmEFmF7EdDJ5Y9S36RXQuskQlwOnpBhfC4SMNu" | base64 -w 0`

Run the following command to inspect the token:

echo $TOKEN

Your output should resemble:

eyB0bGciOiJSD0BbsiIsImprdSI6Imh0dHBzOi8vYXV0…

Your environment is ready for sending requests to the Pipelines API.

Get the pipeline description file

The pipeline description file contains the details of your pipeline, including metadata and SQL source code, in JSON format.

Run the following command to get details about the pipeline and save them to a local file:

curl -s -H "Authorization: Basic ${TOKEN}" \
  -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \
  jq . > ${PIPE_ID}.json

The command sends a GET request to the API endpoint, pipes the response to jq for formatting, and directs the output to a JSON file.

Patch the pipeline

You can change the pipeline’s metadata or SQL source code in a description file and upload the whole file to Stream Designer. Also, you can upload a patch file that contains only the fields you want to modify.

Run the following command to deactivate the pipeline and return it to draft mode:

confluent pipeline deactivate ${PIPE_ID}

Create a file named patch.json and copy the JSON for the values you want to change into the file. For example, the following JSON enables patching the pipeline description. The environment and Kafka cluster IDs are required. 

{
  "spec": {
    "environment": {
      "id": "<env-id>"
    },
    "kafka_cluster": {
      "id": "<kafka-cluster-id>"
    },
    "description": "patched description"
  }
}

Save the file and run the following command to patch the pipeline with the updated description:

curl -s -H "Authorization: Basic ${TOKEN}" \
  -X PATCH -H "Content-Type: application/json" \
  -d @patch.json "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .

Run the following command to verify the patch. The curl command sends a GET request to the Pipelines API and jq selects the description field from the JSON response.

curl -s -H "Authorization: Basic ${TOKEN}" \
  -X GET "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | \
  jq .spec.description

Your output should resemble:

"patched description"

Rotate secrets

To keep your pipelines secure, you may need to rotate secrets periodically. The Pipelines API enables secret rotation by using a patch file

Run the following command to deactivate the pipeline and return it to draft mode:

confluent pipeline deactivate ${PIPE_ID}

Run the following command to create a new API key and secret for authenticating the connector to the Kafka cluster:

confluent api-key create --resource ${CLUSTER_ID}

Note the API key and secret and store them in a secure location.

Create a new file named rotate.json and copy the following JSON into it. Assign the new API key and secret to the named secret you used previously.

{
  "spec": {
    "environment": {
      "id": "<env-id>"
    },
    "kafka_cluster": {
      "id": "<kafka-cluster-id>"
    },
    "secrets": {
      "PAGEVIEWS_SOURCE_KAFKA_API_KEY": "<new-kafka-api-key>",
      "PAGEVIEWS_SOURCE_KAFKA_API_SECRET": "<new-kafka-api-secret>"
    }
  }
}

Save the file and run the following command to patch the pipeline with the rotated secret. The curl command sends a PATCH request to the Pipelines API and jq selects the secrets field from the JSON response.

curl -s -H "Authorization: Basic ${TOKEN}" \
  -X PATCH -H "Content-Type: application/json" \
  -d @rotate.json "https://api.confluent.cloud/sd/v1/pipelines/${PIPE_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .spec.secrets

Your output should resemble:

{
  "PAGEVIEWS_SOURCE_KAFKA_API_KEY": "<new-kafka-api-key>",
  "PAGEVIEWS_SOURCE_KAFKA_API_SECRET": "<new-kafka-api-secret>"
}

The named secret values are displayed in the returned JSON. 

Reactivate the pipeline to use the new secret. Reactivating updates the connector configuration.

confluent pipeline activate ${PIPE_ID}

Create a pipeline

With the Pipelines API, you can create a new pipeline from source code in a description file.

A pipeline description file is a JSON file that specifies the environment, metadata, and SQL code for your pipeline.

Create a file named test_pipeline.json and copy the following JSON into it. The SQL defines a Kafka topic and registers a stream on the topic. Copy your values for env-id, kafka-cluster-id, ksqldb-cluster-id, and sg-cluster-id.

{
    "spec": {
        "display_name":"test-pipeline",
        "description":"Test pipeline",
        "activation_privilege":true,
        "source_code": {
            "sql": "CREATE STREAM `upstream` (id INTEGER, name STRING) WITH (kafka_topic = 'test-topic', partitions=1, value_format='JSON');"
        },
        "environment": {
            "id": "<env-id>"
        },
        "kafka_cluster": {
            "id":"<kafka-cluster-id>"
        },
        "ksql_cluster": 
        {
            "id": "<ksqldb-cluster-id>"
        },
        "stream_governance_cluster":{ 
            "id": "<sg-cluster-id>"
        }
    }
}

Save the file and run the following command to create the pipeline. Ensure that the TOKEN environment variable is assigned with your value.

curl -s -H "Authorization: Basic ${TOKEN}" \
  -X POST -H "Content-Type: application/json" \
  -d @test_pipeline.json \
  "https://api.confluent.cloud/sd/v1/pipelines" | jq .

Your output should resemble:

{
    "id": "pipe-1a2b3c",
    "metadata": {
        "self": "https://confluent.cloud/api/sd/v1/clusters/lkc-9z8y7x/pipelines/pipe-1a2b3c",
        "resource_name": "crn://confluent.cloud/organization=b0b21724-4586-4a07-d0bb-9e47daacbf87/environment=env-a1b3c/cloud-cluster=lkc-9z8y7x/pipeline=pipe-1a2b3c",
        "created_at": "2023-01-31T22:17:14.757855523Z",
        "updated_at": "2023-01-31T22:17:15.353688821Z"
    },
    "spec": {
        "display_name": "test-pipeline",
        "description": "Test pipeline",
        "activation_privilege": true,
        "environment": {
            "api_version": "org/v2",
            "kind": "Environment",
            "…": "…"
        }
    }
}

Your pipeline is created and ready to edit in Stream Designer. Note the pipeline ID, because you need it for other requests to manage the pipeline. In the current example, the pipeline ID is pipe-1a2b3c.

For convenience, assign the ID value to an environment variable.

PIPE2_ID=<pipeline-id>

Activate the pipeline

You can activate the pipeline by using the confluent pipeline activate command, but you can also activate the pipeline by sending a PATCH request.

Create a file named activate.json and copy the following JSON that sets the activation_privilege and activated properties to true.

{
  "spec": {
    "environment": {
      "id": "<env-id>"
    },
    "kafka_cluster": {
       "id": "<kafka-cluster-id>"
    },
    "activation_privilege": true,
    "activated": true
  }
}

Run the following command to activate the pipeline with a PATCH request:

curl -s -H "Authorization: Basic ${TOKEN}" \
  -X PATCH -H "Content-Type: application/json" \
  -d @activate.json "https://api.confluent.cloud/sd/v1/pipelines/${PIPE2_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .

Deactivate the pipeline

You can deactivate the pipeline and return it to the draft state by using the confluent pipeline deactivate command, but you can also deactivate the pipeline by sending a PATCH request.

Deactivate your pipeline by setting the activated property to false in a patch file.

Create a file named deactivate.json and copy the following JSON that sets the activated property to false.

{
  "spec": {
    "environment": {
      "id": "<env-id>"
    },
    "kafka_cluster": {
       "id": "<kafka-cluster-id>"
    },
    "activated": false
  }
}

Save the file and run the following command to deactivate the pipeline with a PATCH request.

curl -s -H "Authorization: Basic ${TOKEN}" \
  -X PATCH -H "Content-Type: application/json" \
  -d @deactivate.json "https://api.confluent.cloud/sd/v1/pipelines/${PIPE2_ID}?environment=${ENV_ID}&spec.kafka_cluster=${CLUSTER_ID}" | jq .

You can do much more with the Pipelines API. For more information, see Manage pipeline life cycle by using the Confluent Cloud REST API.

Summary

Stream Designer enables you to build pipelines quickly and easily use a visual canvas, and now you have the ability to create and manage your pipelines with the Confluent CLI and the Pipelines REST API. Have fun building your streaming data applications!

To learn more, you can check out Confluent Cloud, a fully managed event streaming service based on Apache Kafka.

  • Jim Galasyn is a technical writer at Confluent, working on Kafka Streams and ksqlDB. He came to Confluent after a stint at Docker, and before that, 14 years at Microsoft writing developer documentation. Even after four years of working in Silicon Valley companies, he still prefers Windows.

Did you like this blog post? Share it now