Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
Apache Kafka® is at the core of a large ecosystem that includes powerful components, such as Kafka Connect and Kafka Streams. This ecosystem also includes many tools and utilities that make us, as Kafka developers, more productive while making our jobs easier and more enjoyable. Below, we’ll take a look at a few of these tools and how they can help us get work done.
We like to save the best for last, but this tool is too good to wait. So, we’ll start off by covering kafkacat.
kafkacat is a fast and flexible command line Kafka producer, consumer, and more. Magnus Edenhill, the author of the librdkafka C/C++ library for Kafka, developed it. kafkacat is great for quickly producing and consuming data to and from a topic. In fact, the same command will do both, depending on the context. Check this out:
$ ~ echo "Hello World" | kafkacat -b localhost:29092 -t hello-topic % Auto-selecting Producer mode (use -P or -C to override)
We’ve sent data to stdout with echo and piped it to kafkacat. We only needed two simple flags: -b for the broker and -t for the topic. kafkacat realizes that we are sending it data and switches into producer mode. Now, we can read that data with the exact same kafkacat command:
$ ~ kafkacat -b localhost:29092 -t hello-topic % Auto-selecting Consumer mode (use -P or -C to override) Hello World % Reached end of topic hello-topic [0] at offset 1
If we want to send a record with a key, we just need to use a delimiter and tell kafkacat what it is with the -K flag. In this case, we’ll use a colon:
$ ~ echo "123:Jane Smith" | kafkacat -b localhost:29092 -t customers -K: % Auto-selecting Producer mode (use -P or -C to override)
Again, the same kafkacat command will read the record from the topic:
$ ~ kafkacat -b localhost:29092 -t customers -K: % Auto-selecting Consumer mode (use -P or -C to override) 123:Jane Smith % Reached end of topic customers [0] at offset 1
Alternatively, we can leave the -K flag off when reading, if we only want the value:
$ ~ kafkacat -b localhost:29092 -t customers % Auto-selecting Consumer mode (use -P or -C to override) Jane Smith % Reached end of topic customers [0] at offset 1
Note that piping data from stdout to kafkacat, as we did above, will spin up a producer, send the data, and then shut the producer down. To start a producer and leave it running to continue sending data, use the -P flag, as suggested by the auto-selecting message above.
The consumer will stay running just as the kafka-console-consumer would. In order to consume from a topic and immediately exit, we can use the -e flag.
To consume data that is in Avro format, we can use the -s flag. This flag can be used for the whole record -s avro, for just the key -s key=avro, or just the value -s value=avro. Here’s an example using the movies topic from the popular movie rating tutorial:
$ ~ kafkacat -C -b localhost:29092 -t movies -s value=avro -r http://localhost:8081 ------------------------------------------------------ {"id": 294, "title": "Die Hard", "release_year": 1988} {"id": 354, "title": "Tree of Life", "release_year": 2011} {"id": 782, "title": "A Walk in the Clouds", "release_year": 1995} {"id": 128, "title": "The Big Lebowski", "release_year": 1998} {"id": 780, "title": "Super Mario Bros.", "release_year": 1993}
There is a lot of power packed into this little tool, and there are many other flags that can be used with it. Running kafkacat -h will provide the complete list. For more great examples of kafkacat in action, check out related posts on Robin Moffatt’s blog.
One piece missing from kafkacat is the ability to produce data in Avro format. As we saw, we can consume Avro with kafkacat using the Confluent Schema Registry, but we can’t produce it. This leads us to our next tool.
Confluent REST Proxy is a feature-rich HTTP Kafka client. It can be used to provide Kafka support to applications written in a language without a native Kafka client. This is probably its most common use, but it can also be a handy developer tool. It can easily produce Avro data to a Kafka topic, as shown here:
$ ~ curl -X POST \ -H "Content-Type: application/vnd.kafka.avro.v2+json" \ -H "Accept: application/vnd.kafka.v2+json" \ --data @newMovieData.json "http://localhost:8082/topics/movies" ------------------------------------------------------------------------------ {"offsets":[{"partition":0,"offset":5,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":3}
REST Proxy is part of the Confluent Platform under the Confluent Community License, but it can be used on its own with any Kafka cluster. It can do a lot more than what we’ll cover here, as you can see from the docs.
As shown above, REST Proxy can be used from the command line with curl or something similar. It can also be used with tools such as Postman to build a user-friendly Kafka UI.
Here’s an example of producing to a topic with Postman (the Content-Type and Accept headers were set under the “Headers” tab):
As we can see from both the curl and Postman versions, REST Proxy does require that the schema for Avro messages be passed in with each produce request. A tool like Postman, which allows you to build up a library of saved queries, can make this easier to manage.
To consume from topics with REST Proxy, we first create a consumer in a consumer group, then subscribe to a topic or topics, and finally fetch records to our heart’s content. We’ll switch back to curl so that we can see all the necessary bits at once.
First, we POST to the consumer’s endpoint with our consumer group name. In this POST request, we will pass a name for our new consumer instance, the internal data format (in this case, Avro), and the auto.offset.reset value.
$ ~ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "movie_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \ http://localhost:8082/consumers/movie_consumers ------------------------------------------------------------------------------ {"instance_id":"movie_consumer_instance","base_uri":"http://localhost:8082/consumers/movie_consumers/instances/movie_consumer_instance"}
This will return the instance id and base URI of the newly created consumer instance. Next, we’ll use that URI to subscribe to a topic with a POST to the subscription endpoint.
$ ~ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"topics":["movies"]}' \
http://localhost:8082/consumers/movie_consumers/instances/movie_consumer_instance/subscription
This doesn’t return anything, but should get a 204 response. Now we can use a GET request to the records endpoint of that same URI to fetch records.
$ ~ curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
http://localhost:8082/consumers/movie_consumers/instances/movie_consumer_instance/records ---------------------------------------------------------------------- [{"topic":"movies","key":null,"value":{"id": 294, "title": "Die Hard", "release_year": 1988},"partition":0,"offset":0}, {"topic":"movies","key":null,"value":{"id": 354, "title": "Tree of Life", "release_year": 2011},"partition":0,"offset":1},{"topic":"movies","key":null,"value":{"id": 782, "title": "A Walk in the Clouds", "release_year": 1995},"partition":0,"offset":2},{"topic":"movies","key":null,"value":{"id": 128, "title": "The Big Lebowski", "release_year": 1998},"partition":0,"offset":3},{"topic":"movies","key":null,"value":{"id": 780, "title": "Super Mario Bros.", "release_year": 1993},"partition":0,"offset":4},{"topic":"movies","key":null,"value":{"id":101,"title":"Chariots of Fire","release_year":1981},"partition":0,"offset":5}]
The consumer that we created will remain, and we can make the same GET request anytime to check for new data. If we no longer need this consumer, we can DELETE it using the base URI.
$ ~ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/movie_consumers/instances/movie_consumer_instance
We can also get information about brokers, topics, and partitions with simple GET requests.
$ ~ curl "http://localhost:8082/brokers"
$ ~ curl "http://localhost:8082/topics"
$ ~ curl "http://localhost:8082/topics/movies"
$ ~ curl "http://localhost:8082/topics/movies/partitions"
These requests can return quite a bit of JSON data, which we’ll leave off for the sake of space. However, this does lead us nicely to our next tool.
Though not specific to Kafka, jq is an incredibly helpful tool when working with other command line utilities that return JSON data. jq is a command line utility that allows us to format, manipulate, and extract data from the JSON output of other programs. Instructions for downloading and installing jq can be found on GitHub, along with links to tutorials and other resources.
Let’s go back and take a look at the REST Proxy output from the GET call to our consumer above. It’s not the largest blob of JSON out there, but it’s still a bit hard to read. Let’s try again, this time piping the output to jq:
$ ~ curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \ http://localhost:8082/consumers/movie_consumers/instances/movie_consumer_instance/records | jq [ { "topic": "movies", "key": null, "value": { "id": 294, "title": "Die Hard", "release_year": 1988 }, "partition": 0, "offset": 0 }, { "topic": "movies", "key": null, "value": { "id": 354, "title": "Tree of Life", "release_year": 2011 }, "partition": 0, "offset": 1 }, { "topic": "movies", "key": null, "value": { "id": 782, "title": "A Walk in the Clouds", "release_year": 1995 }, "partition": 0, "offset": 2 }, { "topic": "movies", "key": null, "value": { "id": 128, "title": "The Big Lebowski", "release_year": 1998 }, "partition": 0, "offset": 3 }, { "topic": "movies", "key": null, "value": { "id": 780, "title": "Super Mario Bros.", "release_year": 1993 }, "partition": 0, "offset": 4 }, { "topic": "movies", "key": null, "value": { "id": 101, "title": "Chariots of Fire", "release_year": 1981 }, "partition": 0, "offset": 5 } ]
It’s much easier to read now but still a bit noisy. Let’s say we only want the movie titles and their release years. We can do that easily with jq:
$ ~ curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
http://localhost:8082/consumers/movie_consumers/instances/movie_consumer_instance/records | jq \
| jq '.[] | {title: .value.title, year: .value.release_year}'
{
"title": "Die Hard",
"year": 1988
}
{
"title": "Tree of Life",
"year": 2011
}
{
"title": "A Walk in the Clouds",
"year": 1995
}
{
"title": "The Big Lebowski",
"year": 1998
}
{
"title": "Super Mario Bros.",
"year": 1993
}
{
"title": "Chariots of Fire",
"year": 1981
}
Let’s take a look at what we just did (and you can follow along with the live example at jqplay):
Pretty cool, huh? There is much more that can be done with jq, and you can read all about it in the documentation.
The way that jq operates on a stream of JSON data, allowing us to combine different operations in order to achieve our desired results, reminds me of Kafka Streams—bringing us to our final tool.
The Kafka Streams Topology Visualizer takes the text description of a Kafka Streams topology and produces a graphic representation showing input topics, processing nodes, interim topics, state stores, etc. It’s a great way to get a big-picture view of a complex Kafka Streams topology. The topology for the movie ratings tutorial is not all that complex, but it will serve nicely to demonstrate this tool.
Here’s the text of our topology, which we captured with the Topology::describe method:
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [movies]) --> KSTREAM-MAP-0000000001 Processor: KSTREAM-MAP-0000000001 (stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000 Sink: KSTREAM-SINK-0000000002 (topic: rekeyed-movies) <-- KSTREAM-MAP-0000000001 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-MAP-0000000007-repartition]) --> KSTREAM-JOIN-0000000011 Processor: KSTREAM-JOIN-0000000011 (stores: [rekeyed-movies-STATE-STORE-0000000003]) --> KSTREAM-SINK-0000000012 <-- KSTREAM-SOURCE-0000000010 Source: KSTREAM-SOURCE-0000000004 (topics: [rekeyed-movies]) --> KTABLE-SOURCE-0000000005 Sink: KSTREAM-SINK-0000000012 (topic: rated-movies) <-- KSTREAM-JOIN-0000000011 Processor: KTABLE-SOURCE-0000000005 (stores: [rekeyed-movies-STATE-STORE-0000000003]) --> none <-- KSTREAM-SOURCE-0000000004 Sub-topology: 2 Source: KSTREAM-SOURCE-0000000006 (topics: [ratings]) --> KSTREAM-MAP-0000000007 Processor: KSTREAM-MAP-0000000007 (stores: []) --> KSTREAM-FILTER-0000000009 <-- KSTREAM-SOURCE-0000000006 Processor: KSTREAM-FILTER-0000000009 (stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-MAP-0000000007 Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-MAP-0000000007-repartition) <-- KSTREAM-FILTER-0000000009
You may be adept at reading this kind of output, but most people will find a graphical representation very helpful:
The Kafka Streams Topology Visualizer is a web app that you can host yourself (the source is available on GitHub). For occasional use, the public hosted version is probably sufficient.
A complex topology might be difficult to view all at once, so you can also visualize sub-topologies and then combine the images in a way that is easier to view. This can be a huge help in bringing new developers up to speed on an existing Kafka Streams application.
ksqlDB is an event streaming database that allows us to build complex topologies using syntax familiar to any SQL developer. Since ksqlDB is built on top of Kafka Streams, the Kafka Streams Topology Visualizer also works on these types of topologies.
We can get the topology description from ksqlDB with the EXPLAIN command. First, find the executing query:
ksql> SHOW QUERIES;
Query ID | Query Type | Status | Sink Name | Sink Kafka Topic | Query String
--------------------------------------------------------------------------------------------------------------------------
CSAS_SHIPPED_ORDERS_0 | PERSISTENT | RUNNING:1 | SHIPPED_ORDERS | SHIPPED_ORDERS | CREATE STREAM SHIPPED_ORDERS WITH
...
Now we can use that generated query name, CSAS_SHIPPED_ORDERS_0, to get the topology:
ksql> EXPLAIN CSAS_SHIPPED_ORDERS_0;
This gives us a fair amount of output, so we won’t show it all here, but toward the end, we see the topology description. Copying and pasting it into the visualizer results in this diagram:
We’ve looked at four helpful tools for Apache Kafka developers, but there are many others out there, which is one of the benefits of working in such a vibrant community. If there is a command line tool or a graphical application that you find helpful in getting the most out of Kafka, tell others about it. The Confluent Community Forum is a great place to share this kind of information. We look forward to continuing this discussion with you there!
Dive into the inner workings of brokers as they serve data up to a consumer.
We are proud to announce the release of Apache Kafka 3.9.0. This is a major release, the final one in the 3.x line. This will also be the final major release to feature the deprecated Apache ZooKeeper® mode. Starting in 4.0 and later, Kafka will always run without ZooKeeper.