Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
If you’ve already started designing your real-time streaming applications, you may be ready to test against a real Apache Kafka® cluster. To make it easy to get started with your own dev environment, this blog post demonstrates how to use the command line tool called Confluent CLI to launch Confluent Platform and to produce and consume simple data from Kafka topics. Then, this blog post will show you convenient ways of producing test data to Kafka topics. Generating your own data helps to more rigorously exercise your client applications, build demos, troubleshoot issues or just help you learn more about how Kafka works.
First, let’s walk through how to spin up the services in the Confluent Platform, and produce to and consume from a Kafka topic. Download the Confluent Platform onto your local machine and separately download the Confluent CLI, which is a convenient tool to launch a dev environment with all the services running locally. This lets you, as a developer, not have to worry about how to operate the services.
Disclaimer: The Confluent CLI is for local development—do not use this in production.
Start the services in Confluent Platform with the command confluent local start. After issuing that command, you can check with confluent local status to see that all the Confluent Platform services are started:
$ confluent local start ... $ confluent local status control-center is [UP] ksql-server is [UP] connect is [UP] kafka-rest is [UP] schema-registry is [UP] kafka is [UP] zookeeper is [UP]
Next, produce a sequence of simple messages to a topic called topic1:
$ seq 5 | confluent local produce topic1
Consume those messages from topic1 (press ctrl-C to stop):
$ confluent local consume topic1 -- --from-beginning 1 2 3 4 5 ^CProcessed a total of 5 messages
This your Hello world! for producing and consuming basic Kafka messages. Congratulations!
Now you want to dive into more involved scenarios, test your client application, use more realistic datasets, and perhaps do a compelling Kafka demo for your teammates. However, the produce/consume commands we used above are limited in what they can do. They do not work well for producing records with complex data types, e.g., records with multiple fields, randomizing the data, or give you much control over customizing the data. Basically, the data it produces is just not very realistic and is quite boring.
Short of using real data from a real source, you do have a few options on how to generate more interesting test data for your topics. One option is to write your own client. Kafka has many programming language options—you choose: Java, Python, Go, .NET, Erlang, Rust—the list goes on. You can write your own Kafka client applications that produce any kind of records to a Kafka topic, and then you’re set.
But wouldn’t it be great if you could generate data locally to just fill topics with messages? Fortunately, you’re in luck! Because we have those data generators.
If you want to test an end-to-end pipeline, you may want to incorporate Kafka Connect, which connects Kafka with external systems such as databases, key-value stores, search indexes and file systems. Source connectors read data from external systems and produce to Kafka using the resilient, fault-tolerant Kafka Connect APIs.
If your external system is not available during testing, but you still want to exercise Kafka Connect, use the Kafka Connect Datagen Connector available in Confluent Hub. (Confluent Hub is a pretty cool place where you can find connectors to connect to many types of end systems—check it out!)
You can use predefined datasets with complex records and multiple fields, or define your own schema with your own fields. You can customize the interval at which data is produced, configure the value format to be one of Avro, JSON or String, etc.
To install the Kafka datagen plugin from Confluent Hub, use the command below to get the connector JAR files onto the right path in your machine. Work through a few prompts.
$ confluent-hub install confluentinc/kafka-connect-datagen:latest
Because Confluent Platform is already running, you need to restart the Kafka Connect service to pick up the new JARs:
$ confluent local stop connect $ confluent local start connect
Check that the Kafka datagen connector plugin is available with confluent list plugins.
Then create a file that configures the connector and specifies how it should generate data. For example, create a file called /tmp/datagen-users.json with the following content:
{ "name": "datagen-users", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "topic2", "quickstart": "users", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "max.interval": 1000, "iterations": 10000000, "tasks.max": "1" } }
Load that configuration file to Kafka Connect:
$ confluent local config datagen-users -- -d /tmp/datagen-users.json
It will immediately start producing records to topic2. Consume messages from that topic to see the data.
$ confluent local consume topic2
{"registertime":1500599586519,"userid":"User_9","regionid":"Region_5","gender":"MALE"} {"registertime":1493882456812,"userid":"User_9","regionid":"Region_3","gender":"OTHER"} {"registertime":1514584804940,"userid":"User_9","regionid":"Region_8","gender":"FEMALE"} {"registertime":1498613454415,"userid":"User_7","regionid":"Region_9","gender":"FEMALE"} {"registertime":1510970841590,"userid":"User_8","regionid":"Region_8","gender":"OTHER"} ...
There are a few quickstart schema specifications bundled with the connector. In the example above, the connector loads one of those quickstarts called users.
"quickstart": "users",
This quickstart generates messages with four fields called: registertime, userid, regionid and gender, which you saw when you consumed messages from the resulting topic. Those fields were randomly populated based on the users schema specification.
Each of the fields in the schema defines properties that describe the allowed values. For example, the schema specification for the field regionid describes it as a string having the format Region_[1-9], so the generated values for this field in each of the records will vary in the last digit.
{"name": "regionid", "type": { "type": "string", "arg.properties": { "regex": "Region_[1-9]?" } }},
The connector has several quickstart schemas, including three for clickstreams (e.g., clickstream, clickstream_users and clickstream_codes), pageviews (e.g., pageviews, users) and ratings. If your goal is just to generate more complex data and you don’t care about the actual fields or their values, then any of the bundled schema specifications will work and are really easy to use.
However, you can also define your own schema specifications if you want to customize the fields and their values to be more domain specific or to match what your application is expecting. Under the hood, this datagen connector uses Avro Random Generator, so the only constraint in writing your own schema specification is that it is compatible with Avro Random Generator.
To define your own schema:
... "schema.filename": "/path/to/your_schema.avro", "schema.keyfield": "<field representing the key>", ...
For reference in customizing your own schema specification, look at the Avro Random Generator documentation and any of the connector’s bundled schema specifications.
Additionally, you can configure the interval at which records are produced (i.e., max.interval) and the number records sent (i.e., iterations) . And as with any Kafka connector, you may customize any of the general Kafka Connect configuration settings. Notice that the data generator produced JSON records in the earlier example, because the configuration file had a configuration parameter value.converter that was set to use JSON.
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
It could have just as easily produced Avro records by changing that configuration parameter as shown below (see all Kafka connector configuration parameters).
... "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", ...
On the other hand, if you want to produce records directly to Kafka topics without using a connector, use the command line data generator. It is very similar to the aforementioned data generator, including the ability to use predefined datasets or define your own. The data generator can produce JSON, Avro, or delimited records. The example below shows how to produce Avro records to a topic called topic3:
$ ksql-datagen quickstart=users format=avro topic=topic3 maxInterval=100
You can consume messages from that topic. Notice the addition of a new argument --value-format avro, which by default looks up the Avro schema from the locally running Confluent Schema Registry.
$ confluent local consume topic3 -- --value-format avro --from-beginning{"registertime":{"long":1487831160202},"userid":{"string":"User_3"},"regionid":{"string":"Region_5"},"gender":{"string":"OTHER"}} {"registertime":{"long":1508909059796},"userid":{"string":"User_4"},"regionid":{"string":"Region_2"},"gender":{"string":"FEMALE"}} {"registertime":{"long":1499304790700},"userid":{"string":"User_7"},"regionid":{"string":"Region_1"},"gender":{"string":"FEMALE"}} {"registertime":{"long":1510656714861},"userid":{"string":"User_5"},"regionid":{"string":"Region_2"},"gender":{"string":"MALE"}} {"registertime":{"long":1510749005264},"userid":{"string":"User_3"},"regionid":{"string":"Region_5"},"gender":{"string":"FEMALE"}} ...
Now that you know how to generate your own data to Kafka topics, what can you do with it? The answer is: whatever you want! You can use this data for getting started with Kafka, testing your client applications, building demos or even troubleshooting.
To see more of the Confluent Platform in action, we recommend you get going with the Confluent Platform quick start. From there, you can build more interesting real-time streaming applications with the Kafka Streams API or KSQL, and build more interesting data pipelines with other connectors from the Confluent Hub!
If you’d like to test your application with serverless Kafka, read Creating a Serverless Environment for Testing Your Apache Kafka Applications, which walks you through how to create a Confluent Cloud stack and generate test data to your Kafka topics.
Dive into the inner workings of brokers as they serve data up to a consumer.
We are proud to announce the release of Apache Kafka 3.9.0. This is a major release, the final one in the 3.x line. This will also be the final major release to feature the deprecated Apache ZooKeeper® mode. Starting in 4.0 and later, Kafka will always run without ZooKeeper.