ksql> SELECT * FROM MY_FIRST_KSQL_STREAM;
Kafka in the Cloud: Why it’s 10x better with Confluent | Find out more
KSQL is the powerful SQL streaming engine for Apache Kafka®. Using SQL statements, you can build powerful stream processing applications. In this article, we’ll see how to troubleshoot some of the common issues that people encounter with KSQL. More advanced topics such as digging deeper into the internals for more advanced troubleshooting will be covered by a subsequent article.
You can use this article as a reference, or you can follow along with the code examples to try them out as we go. Using Docker and Docker Compose, we can easily provision an environment in which we explore and try out the different techniques and tools. The environment includes a data generator sending continuous stream of events into a Kafka topic that we will use for testing. You can find all the necessary code on GitHub.
So, now let’s dive in and start exploring what to do when things aren’t working…
Probably the most common question in the Confluent Community Slack group’s #ksql channel is:
Why isn’t my KSQL query returning data?
That is, you’ve run a CREATE STREAM, but when you go to query it…
ksql> SELECT * FROM MY_FIRST_KSQL_STREAM;
…nothing happens. And because KSQL queries are continuous, your KSQL session appears to “hang.” That’s because KSQL is continuing to wait for any new messages to show you. So if your run a KSQL SELECT and get no results back, what could be the reasons for that?
The answer usually comes down to one of five main reasons (the first four of which are variations on a theme):
Let’s look at each of these in turn and how to diagnose them.
To start with, we believe we’ve got data in a topic called ratingz, so we register it with KSQL as a STREAM:
ksql> CREATE STREAM RATINGS (rating_id BIGINT, \
user_id BIGINT, \
stars INT, \
route_id BIGINT, \
rating_time BIGINT, \
channel VARCHAR, \
message varchar) \
WITH (KAFKA_TOPIC='ratingz', \
VALUE_FORMAT='JSON');
Message
Stream created
ksql>
Then, we query the new stream:
SELECT rating_id, user_id, message FROM RATINGS;
No data comes back—time to do some detective work. To exit from the running SELECT statement, press Ctrl-C.
First, we need to confirm which Kafka topic we’re using to drive the stream. We think we know (we just ran the CREATE STREAM, right?), but as with any good troubleshooting approach the key is methodically working through the possibilities.
So, using DESCRIBE EXTENDED, we can verify the topic from which the stream is sourced:
ksql> DESCRIBE EXTENDED RATINGS;
Name : RATINGS
Type : STREAM
Key field :
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : ratingz (partitions: 1, replication: 1) (1)
[...]
(1)The Kafka topic used by this stream
Our source topic is ratingz. Now, we will step away from KSQL and use another Kafka consumer to verify if there is data in the topic. My preferred tool here is kafkacat, but you can use other tools, such as kafka-console-consumer, if you like. Invoking kafkacat shows:
$ docker run --network ksql-troubleshooting_default --tty --interactive --rm \
confluentinc/cp-kafkacat \
kafkacat -b kafka:29092 \
-C -t ratingz \
-o beginning
% Reached end of topic ratingz [0] at offset 0
The -o beginning argument tells kafkacat to go back to the beginning of the topic, and -C to read (consume) the messages. Reached end of topic shows that there’s no data to read. No data means that KSQL isn’t going to be showing anything in the output of a SELECT.
ℹ️ |
So in this case, there’s no data in the source topic. Turns out we mistook the topic name, and used ratingz instead of ratings! D’oh! |
Let’s fix our stream and use the ratings topic (with an s this time). Register it with KSQL, dropping the previous version first:
ksql> DROP STREAM RATINGS;
Message
Source RATINGS was dropped.
ksql> CREATE STREAM RATINGS (rating_id BIGINT, user_id BIGINT, stars INT, route_id BIGINT, rating_time BIGINT, channel VARCHAR, message varchar) WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='JSON');
Message
Stream created
ksql>
As before, try to query the stream, and you’ll find that there’s no data being returned:
SELECT rating_id, user_id, message FROM RATINGS;
[ no output returned from KSQL ]
Like last time, first verify the source topic for the stream that we’re querying:
ksql> DESCRIBE EXTENDED RATINGS;
[...]
Kafka topic : ratings (partitions: 1, replication: 1)
And use kafkacat to check if there’s any data in it:
$ docker run --network ksql-troubleshooting_default --tty --interactive --rm \
confluentinc/cp-kafkacat \
kafkacat -b kafka:29092 \
-C -t ratings \
-o beginning
{"rating_id":1,"user_id":2,"stars":1,"route_id":2350,"rating_time":1537182554356,"channel":"web","message":"thank you for the most friendly, helpful experience today at your new lounge"}
{"rating_id":2,"user_id":10,"stars":3,"route_id":4161,"rating_time":1537182555220,"channel":"web","message":"more peanuts please"}
[...]
It turns out that there are thousands of messages in the topic. But, by default, KSQL reads from the end of a topic, and no new messages are being written in the topic. As soon as new messages are sent to the topic, the SELECT returns results.
ksql> SELECT rating_id, user_id, message FROM RATINGS;
1 | 8 | (expletive deleted)
2 | 19 | more peanuts please
3 | 8 | meh
[...]
ℹ️ |
So here we just needed to feed the topic more data. What about if you want to look at data already in the topic? That’s what we’ll look at in the next section. |
Kafka is an immutable log of events, and data is persisted according to the retention settings. When an application reads data from a Kafka topic, the data remains in place, but the offset in the log at which that particular application has read up to is recorded. Another application can read the same data from the same topic with complete independence from the first. The main thing is that there is a log of data, and consuming applications choose the point on the log at which they want to start reading.
When KSQL reads data from a topic, it will by default read from the latest offset—that is to say, only new messages arriving in the topic after the topic is registered in KSQL will be read.
You can verify the offset setting using LIST PROPERTIES:
ksql> LIST PROPERTIES;
Property | Value
[...]
ksql.streams.auto.offset.reset | latest
[...]
Oftentimes—and particularly in testing and development—you’ll want to read the data that already exists in a topic. To tell KSQL to do this, change the offset property:
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
Now when you run a SELECT, KSQL will return the data from the beginning of the topic. The SELECT will still run continuously, so if there is new data arriving, you’ll see that—and if there isn’t, the SELECT will just hang and wait for new data (or for you to cancel the query).
ℹ️ |
If you want KSQL to read from the beginning of your topic, you must configure the offset from which it will read by runningSET 'auto.offset.reset'='earliest';. |
Since KSQL is based on SQL, you can restrict the data you want to return using predicates. These include WHERE and HAVING, as well as by implication JOIN.
If your query is not returning data and you think it should, and you’ve followed the other advice in this article already, then take a standard troubleshooting course of methodically stripping back its components until it does.
Consider this example:
SELECT * FROM RATINGS WHERE STARS > 5
But if no star ratings are above 5, no data will be returned. To debug this, perhaps remove the predicate and bring in the STARS column to verify the data:
ksql> SELECT STARS FROM RATINGS;3 3 2 1
Or, use KSQL’s aggregate functionality to look at the maximum value:
ksql> SELECT CHANNEL, MAX(STARS) FROM RATINGS GROUP BY CHANNEL; android | 2 iOS | 4 android | 2
From just a sample of the data, it looks like the issue here could be that STARS is never greater than 5, and thus the SELECT in the example above will never return data.
Data in Kafka is just bytes. It’s up to the producer how it serializes the source message; the consumer (which is KSQL here) needs to deserialize using the same method. Common serialization formats include Avro, JSON, etc.
If KSQL cannot deserialize message data, it will not write anything to the SELECT results. If this happens, you could have checked the three situations above and ruled them out but still not have any data returned to your SELECT.
Here’s a simple example using one of the existing internal topics called _confluent-metrics. Let’s register it using a fictional schema that we believe to be correct for the purposes of this example, and declare the serialization format of the message values to be JSON (Tip: It’s not JSON!).
CREATE STREAM METRICS (col1 int, col2 int, col3 varchar) \
WITH (KAFKA_TOPIC='_confluent-metrics', VALUE_FORMAT='JSON');
Taking the lesson from above, set the offset to earliest so that we will definitely pull all the messages, and run a SELECT:
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'
ksql> SELECT * FROM METRICS;
So … no results coming back. Let’s go through the checklist. We can check off the offset already, as we’ve specifically set that.
ksql> DESCRIBE EXTENDED METRICS;
[...]
Kafka topic : _confluent-metrics (partitions: 12, replication: 1)
$ docker run --network ksql-troubleshooting_default --tty --interactive --rm \
confluentinc/cp-kafkacat \
kafkacat -b kafka:29092 \
-C -t _confluent-metrics \
-o beginning -c 1 (1)
���,�
kafka.logSizeLog"$partition.9.topic.__consumer_offsets*Akafka.log:type=Log,name=Size,topic=__consumer_offsets,partition=90�
kafka.logSizeLog"$partition.8.topic.__consumer_offsets*Akafka.log:type=Log,name=Size,topic=__consumer_offsets,partition=80�
kafka.logSizeLog"$partition.7.topic.__consumer_offsets*Akafka.log:type=Log,name=Size,topic=__consumer_offsets,partition=70�
kafka.logSizeLog"$partition.6.topic.__consumer_offsets*Akafka.log:type=Log,name=Size,topic=__consumer_offsets,partition=60�
[...]</code></pre>
(1)The -c 1 argument tells kafkacat to just return the one message and then exit.
Per the checklist, there is data, we’re querying the correct topic and have set the offset back to the beginning … but why isn’t KSQL returning data?
Well, the data we can see from the output of kafkacat clearly isn’t JSON, which is what we declared in the CREATE STREAM command. If we go to the KSQL server log file, you’ll see a whole bunch of these deserialization errors:
[2018-09-17 12:29:09,929] WARN task [0_10] Skipping record due to deserialization error. topic=[_confluent-metrics] partition=[10] offset=[70] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: _confluent-metrics
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ((CTRL-CHAR, code 127)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: (byte[])�����,�
�
[...] [truncated 1544 bytes]; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:669)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:567)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2624)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:88)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:77)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:45)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
[...]
You can see from the stack trace that it’s using the JSON deserializer (as you’d expect, given our VALUE_FORMAT configuration). You can also see from the sample message ([Source: (byte[])�����,� �) in the log output that the JsonParseException cites that the message clearly isn’t JSON.
ℹ️ |
If you hit this problem, then you need to synchronize your serialization and deserialization formats. KSQL supports delimited (CSV), JSON or Avro. Protobuf users can check out KLIP-0, which proposes adding Protobuf support to KSQL. |
Our previous example referred to cases where no messages are being returned, but you may also see cases where only some of the messages are shown, and it could be the same root cause—serialization—but with a different slant. Instead of simply getting your serialization format wrong, maybe you chose the right serialization format, but some malformed messages exist on the topic.
In this simple example, we’ll put some data onto a new topic, using JSON but with some malformed messages:
docker run --interactive --rm --network ksql-troubleshooting_default \
confluentinc/cp-kafkacat \
kafkacat -b kafka:29092 \
-t dummy_topic \
-P <<EOF
{"col1":1,"col2":16000}
{"col1":2,"col2:42000}
{"col1":3,"col2":94000}
EOF
Note that the second message is invalid JSON, as it’s missing a " after the field name (col2).
Next, register the topic in KSQL:
ksql> CREATE STREAM DUMMY (COL1 INT, COL2 VARCHAR) \
WITH (KAFKA_TOPIC='dummy_topic', VALUE_FORMAT='JSON');
Message
Stream created
----------------
Set the offset to earliest so that we will definitely pull all the messages, and then run a SELECT:
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'none' to 'earliest'
ksql> SELECT * FROM DUMMY;
1537186945005 | null | 1 | 16000
1537186945005 | null | 3 | 94000
Note that we only get two messages even though there are three on the topic.
If you check out the KSQL server log, you’ll see:
[2018-09-17 13:03:13,662] WARN task [0_0] Skipping record due to deserialization error. topic=[dummy_topic] partition=[0] offset=[1] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: dummy_topic
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name
at [Source: (byte[])"{"col1":2,"col2:42000}"; line: 1, column: 45]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:594)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.parseEscapedName(UTF8StreamJsonParser.java:1956)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.slowParseName(UTF8StreamJsonParser.java:1861)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1645)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:999)
at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:247)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:68)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
Note the partition and offset shown in the error message (partition=[0] offset=[1]).
Head back to the ever-versatile kafkacat and run the following:
docker run --network ksql-troubleshooting_default \
--tty --interactive --rm \
confluentinc/cp-kafkacat \
kafkacat -b kafka:29092 -C -K: \
-f '\nKey: %k\t\nValue: %s\n\Partition: %p\tOffset: %o\n--\n' \ (1)
-t dummy_topic \
-o 1 \ (2)
-p 0 \ (3)
-c 1 (4)
(1)-f to format the output and show some nice metadata
(2)-o 1 start at offset 1
(3)-p 0 read from partition 0
(4)-c 1 consume just one message
The output is:
Key:
Value: {"col1":2,"col2:42000}
Partition: 0 Offset: 1
--
This shows us, if we were in any doubt, that the message value is not valid JSON and thus can’t be consumed by KSQL.
ℹ️ |
If KSQL is not showing all of your messages, then check that they are in a valid format for the declared serialization. So if you’ve set VALUE_FORMAT='JSON', then check that each message is indeed valid JSON. |
KSQL writes most of its logs to stdout by default. If you’re running KSQL using Docker, then you’ll find the output in the container logs themselves. For example:
Using the Confluent CLI, you can run the following:
If you’ve installed Confluent Platform using RPM/DEB, then you should find the logs under /var/log/confluent/.
When you declare a table in KSQL, you are telling it to return the state for a given key. This is in contrast to a stream, in which every message in the topic is returned. As part of a table’s definition, the key must be specified and match the key of the underlying Kafka topic. If you declare a KSQL table on a topic in which the key is NULL, then you will get no rows back when you query the table.
To check the key for your messages, you can use the PRINT command:
ksql> PRINT'user_logons' FROM BEGINNING LIMIT 1; Format:JSON {"ROWTIME":1554824437225,"ROWKEY":"null","user":{"first_name":"Damara","last_name":"Blankman","email":"dblankman0@npr.org"},"ip_address":"253.30.46.155","logon_date":"2017-07-25T00:34:31Z"}
Here we can see "ROWKEY":"null" showing that the message has no key. Thus, if we declared a table on top of this topic, no rows would be returned when we queried it.
If you want to set the key on a message, you should do this when the message is produced into the topic. You can also use KSQL to apply a key, but note that doing this may affect the partition that the message is in and therefore the ordering of messages can be impacted. Here is an example of changing the key, having already declared a stream on top of the above topic:
ksql> CREATE STREAM USER_LOGONS_KEYED AS SELECT * FROM USER_LOGONS PARTITION BY IP_ADDRESS;Message
Stream created and running
Now when we examine the topic that the above stream writes to, we see that ROWKEY is set:
ksql> PRINT 'USER_LOGONS_KEYED' FROM BEGINNING LIMIT 1; Format:JSON {"ROWTIME":1554824437225,"ROWKEY":"220.61.74.180","USER":{"FIRST_NAME":"Carrissa","LAST_NAME":"Halston","EMAIL":"chalston5@sitemeter.com"},"IP_ADDRESS":"220.61.74.180","LOGON_DATE":"2018-03-22T15:20:04Z"}
This topic can then be used for a KSQL table:
ksql> CREATE TABLE user_logons_table (user STRUCT< first_name VARCHAR, last_name VARCHAR, email VARCHAR >, ip_address VARCHAR, logon_date VARCHAR) WITH (KAFKA_TOPIC ='USER_LOGONS_KEYED', VALUE_FORMAT='JSON', KEY='ip_address');Message
Table created
ksql> SELECT * FROM USER_LOGONS_TABLE LIMIT 1; 1554824437229 | 159.231.219.124 | {FIRST_NAME=Gail, LAST_NAME=Skipton, EMAIL=gskipton3az@google.it} | 159.231.219.124 | 2017-06-01T16:48:49Z Limit Reached Query terminated
Still stuck and need more help? Here are several places to turn:
Event design plays a big role in your ability to fix bad data in your streams. But if you’ve wrecked a stream with bad data (i.e., it’s unavoidably contaminated), you'll need to employ a "rewind, rebuild, and retry" strategy.
At a high level, bad data is data that doesn’t conform to what is expected, and it can cause serious issues and outages for all downstream data users. This blog looks at how bad data may come to be, and how we can deal with it when it comes to event streams.