In August 2017, Confluent announced and open sourced KSQL, an open source streaming SQL project for Apache Kafka. While released as developer preview, many developers already use KSQL to build streaming applications on top of Apache Kafka. Use cases include streaming ETL, anomaly detection, and real time dashboards. Check the video from Taking KSQL for a Spin using Real-Time Device Data to see the potential of KSQL for Internet of Things scenarios.
A key component of any SQL language is its functions, and KSQL already has many functions built in. This blog post explains how to write user-defined functions (UDFs) to extend the available set of functions in your KSQL code.
KSQL: the Streaming SQL Engine for Apache Kafka
KSQL is implemented on top of Kafka’s Streams API, and it leverages all its benefits to build scalable, mission-critical streaming applications. A key difference of KSQL to other technologies in this space is that KSQL does not require any coding in Java, Scala, or another programming language. You can write SQL-like queries and deploy them for continuous stream processing
KSQL example query:
Developers and data engineers can opt to either (1) write, execute, and deploy SQL queries on the KSQL servers, or (2) embed the statements into own applications or microservices—no matter if Java, any other programming language like Go, Python or .NET C#, or (3) integrate with KSQL’s REST interface.
The difference from KSQL to traditional SQL engines—where you execute a request-response query to a relational database like Oracle or MySQL—is that you define and run continuous queries or “streaming queries.” These queries run forever (or until you terminate them explicitly) to analyze incoming data-in-motion, instead of historical data-at-rest that is stored in, for example, a traditional database.
Built-in KSQL Functions to Process Streaming Data
Functions are one of the key components in SQL. Functions in SQL engines are like functions, operations, or methods in programming languages such as Java, Python, or Go. They accept parameters, perform an action—such as a complex calculation—and return the result of that action as a value. Functions are used within a SQL query to filter, transform, or aggregate data. Many functions are already built into KSQL, and their number increases with every release:
But what do you do if you miss a specific function in the KSQL syntax? Let’s us show you how easy it is to write your own user-defined function.
Before We Begin
In this blog post we explain how to write a UDF for KSQL by adding the UDF code to the KSQL source. While this approach is still quite straightforward for many users, we will provide a much simpler approach in a future release to implement such UDFs where you will no longer need to modify the KSQL source. In the meantime, please follow the instructions in this blog post if you need to implement your own UDFs right now.
User-Defined Functions in KSQL
In contrast to built-in functions, UDFs are functions provided by the user of a program or environment. A UDF provides a mechanism for extending the functionality of the KSQL engine by adding a function that can be evaluated in standard query language statement.
Using a UDF in KSQL looks just like using a built-in functions in KSQL. Both are registered to the KSQL engine before startup. They expect input parameters and return output values. Here is an example of the built-in function
‘STRINGTOTIMESTAMP’ to convert a string value in the given format into the BIGINT value representing the timestamp:
This function takes the timestamp 2017-12-18 11:12:13.111 (data type: String) and converts it to 1513591933111 (data type: BigInt).
Now let’s take a look at the steps to build your own UDF the same way as the
‘STRINGTOTIMESTAMP’ function above.
How to Create, Deploy, and Test KSQL UDFs
This section shows the steps to build, deploy and test a new UDF. The following example shows how to build simple UDF where you need no additional external Java classes or JAR library. We just write Java code within the UDF to do a conversion of a timestamp from String to BigInt. A more complex UDF might require additional external dependencies to be added to your project.
Step 0: Clone the KSQL project from GitHub
First clone (or fork) the KSQL project with the ‘git clone’ command from https://github.com/confluentinc/ksql. The main sub-project in KSQL for adding UDFs is ‘ksql-engine’ and the package ‘io.confluent.ksql.function’ (https://github.com/confluentinc/ksql/tree/0.4/ksql-engine).
Step 1 (optional): Embed external code or libraries into projectDifferent steps might be needed depending on what your new UDF requires to perform its task:
- Dependencies on another project such as DeepLearning4J (e.g. the implementation of a complex mathematical function)
- New classes (e.g. a generated analytic model from machine learning framework)
- Additional libraries or other dependencies like zip or xml files
You can either simply add new resources like another Java Class to the project or extending the existing Maven POM (ksql-engine/pom.xml).
Step 2: Implement a New UDF Class
As you can see, the full implementation is just a few lines of Java code. In general, you need to implement the logic between receiving input and returning output of the UDF in the evaluate() method. You also need to implement exception handling (e.g. invalid input arguments) where applicable. The init() method is empty in this case, but could initialise any required object instances.
Note that this UDF has state: dateFormat can be null or already initialized. However, no worries. You do not have to manage the scope as Kafka Streams (and therefore KSQL) threads are independent of each other. So this won’t cause any issues.
Step 3: Register UDF to FunctionRegistry
The new UDF needs to be registered in the FunctionRegistry class like all the built-in KSQL functions:
The first argument of the KsqlFunction is the return type. The second is the list of types for the UDF arguments. The third is the name of the function. The last is the Java class that implements the UDF functionality.
In our case, the return value is the timestamp as as type “BigInt.” The input parameters are the column name with the timestamp String and the format of the timestamp String. You will be able to call the UDF in a KSQL query the following way:
STRINGTOTIMESTAMP(col1, 'yyyy-MM-dd HH:mm:ss.SSS').
That’s it. We just implemented a new UDF and registered it to the KSQL engine. No more coded needed.
Step 4: (Re-)Build KSQL Project with Maven
Now you need to re-run the Maven build of your KSQL project so that you can use the new UDF. If you’re in a hurry and don’t mind taking some risks, you can remove the -DskipTests=true -Dcheckstyle.skip=true for a faster build.
Step 5: Start Kafka Broker and KSQL CLI
In this case, we start a new Kafka broker including Zookeeper via Confluent CLI and then start KSQL CLI:
You can point KSQL at a specific Kafka cluster using command-line parameters:
Step 6: Test your new UDF
After starting the Kafka cluster and KSQL engine, we need to prepare the demo. In my case, I create an input topic and generate new test data via command line or continuous script.
a) Create Kafka Topic
b) Create Test Data
c) Create KSQL Stream
d) Use UDF to Convert the String to Timestamp
e) Send Test Data to KSQL Stream
As soon as you send the message, KSQL processes it and converts the timestamp. You can see the result in the select query you started in step d): 123 | 1513591933111
For more advanced testing, you should write a script which generates continuous feeds with different test data.
f) Consume results from any Kafka Client
Since KSQL runs on normal Kafka topics, you can also process and forward any KSQL related messages with the normal Kafka Consumer in any Kafka Clients like Java, .NET, Go or Python or directly consume via Kafka Connect.
Step 7: Debugging and Error Fixing
You should also test the UDF with values which are expected to fail. If you pass the wrong number of arguments to the
STRINGTOTIMESTAMP function, you will see an error in the CLI as defined in our implementation:
StringToTimestamp udf should have two input arguments
If your UDF is not working as expected in your KSQL engine (e.g. returning ‘null’), then the best way to find errors and exceptions is to check the KSQL log file (by default generated in a temp folder):
For example, if you add a wrong timestamp format, you will see a similar error message in the log file (as we defined this in the UDF implementation):
Exception running StringToTimestamp( ‘2017-12-18 11:12:13.111’ , yyyy-MM-dd HH:mm:ss.SSS) : Unparseable date: ” ‘2017-12-18 11-12-13-111′”
Step 8: Use your new UDF in your KSQL Projects
That’s all you need to do to build, deploy and test new UDFs for KSQL. The above steps described manual testing of your new UDF. Of course, you can leverage any tools or frameworks to automate testing and integrate CI/CD tools like Jenkins, as this is just plain Java code and shell scripts. Because KSQL is just a simple Java API, you have the entire Java build and test ecosystem at your disposal.
Build and Share your KSQL UDF
You have learned how easy it is to create and add new User Defined Functions to KSQL. Just a few steps to implement, register and run a new UDF.
In a future KSQL release will make it even easier to build UDFs by removing the need to rebuild the project to add your own UDFs, e.g. by providing a CREATE FUNCTION statement. We encourage you to share your feedback on how you’d like to build your own UDFs.
Where to go from here
If you have enjoyed this article, you might want to continue with the following resources to learn more about KSQL:
- Get started with the latest release of KSQL to process and analyze your company’s data in real-time.
- Join us for a 3-part online talk series for the ins and outs behind how KSQL works. The first part in the series, Exploring KSQL Patterns, is on February 13th. Register now.
If you are interested in contributing to KSQL, we encourage you to get involved by sharing your feedback via the KSQL issue tracker, voting on existing issues by giving your +1, or opening pull requests. Use the #ksql channel in our public Confluent Slack community to ask questions, discuss use cases or help fellow KSQL users.