ksqlDB is the event streaming database purpose-built for stream processing applications that enables real-time data processing against Apache Kafka®. ksqlDB makes it easy to read, write and process streaming data in real time, at scale, using SQL-like semantics. ksqlDB already has plenty of available functions like
COUNT. Even so, many users need additional functions to process their data streams.
ksqlDB now has an official API for building your own functions. As of the release of Confluent Platform 5.0, ksqlDB supports creating user-defined scalar functions (UDF) and user-defined aggregate functions (UDAF). All it takes to write your own is just one Java class. The UD(A)F can embed any additional dependencies (like additional JARs or other binaries) if needed.
In this blog post, we’ll discuss the main need and motivation for UD(A)Fs in ksqlDB, show an advanced example with a deep-learning UDF, prove how easy it is to build even something that is very powerful and identify some issues that you might face during development and testing.
Let’s first discuss what UD(A)Fs are and why you might need to build your own ones.
Functions are one of the key components in ksqlDB. Functions in ksqlDB are like functions, operations or methods in other SQL engines or programming languages, such as Java, Python or Go. They accept parameters, perform an action—such as a complex calculation—and return the result of that action as a value. They are used within a KSQL query to filter, transform or aggregate data.
Many functions are already built into ksqlDB, and more are added with every release. We distinguish them into two categories: stateless ksqlDB scalar functions and stateful ksqlDB aggregate functions.
But what do you do if you miss a specific function in the ksqlDB syntax? Pretty simple: You build your own one with the official ksqlDB API. Let’s see how easy it is to write your own stateless UDF or stateful UDAF.
In contrast to built-in functions, UD(A)Fs are functions provided by the user of a program or environment. A UD(A)F provides a mechanism for extending the functionality of the ksqlDB engine by adding a function that can be evaluated in a standard query language statement. Using a UD(A)F in ksqlDB looks exactly like using built-in functions in ksqlDB. Both are registered to the ksqlDB engine before startup. They expect input parameters and return output values.
Here is an example of the built-in function
STRINGTOTIMESTAMP to convert a string value in the given format into the BIGINT value representing the timestamp:
This function takes the timestamp 2017-12-18 11:12:13.111 (data type: String) and converts it to 1513591933111 (data type: BigInt).
Now let’s take a look at the steps to build your own UDF to use it in your ksqlDB statements the same way as the
STRINGTOTIMESTAMP function does above.
Building ksqlDB UDFs is easy, no matter if they are simple or powerful. With that in mind, let’s take a look at the steps to develop, test and deploy a new ksqlDB UDF.
You can build very simple ksqlDB functions like a specific scalar function for multiplication (see Confluent documentation to find out how), a really powerful function with many computations or aggregations, or anything in between these two extremities. Creating a stateful UDAF works the same way, of course.
The example below shows you how to build a more advanced UDF, which embeds an analytic model (i.e., an external dependency). This sounds complex, but in reality the implementation is simple. We write one Java class, add the dependency, write our business logic and build an uber JAR which includes the UDF and its dependency. This UDF can be deployed to the ksqlDB server(s) and then used in ksqlDB statements by end users without any coding in a programming language.
Develop a UDF is quite easy, too. Just implement the function in a Java method within a Java class and add the corresponding Java annotations:
Here is the full source code for the anomaly detection ksqlDB UDF:
As you can see, you can really focus on implementing the business logic. In my example, I consume new events and do stateless processing by applying an analytic model. The prediction is returned immediately. Under the hood, ksqlDB receives events from a Kafka topic and sends the output to another Kafka topic.
The wrapper code for the UDF is minimal. You only need to add two annotations and the corresponding import statements to your POJO (plain old Java object):
@UDFDescription: class annotation to describe the name and functionality of the UDF
@UDF: method annotation to describe the functionality of the specific method (there can be more than one of these methods in a UDF class to process different interface signatures)
And that’s it. The annotations in this class make sure the ksqlDB server interprets this class correctly and loads the UDF into the runtime engine during startup.
The deployment of the UDF is pretty straightforward, no matter if you’re using a local installation on your laptop or a cluster of remote ksqlDB servers:
ksql.extension.dirin ksqlDB configuration file
ksql-server.properties. If this directory does not exist yet, simply create a new directory.
Let’s now think about a practical example for a powerful UDF: continuous processing of millions of events from connected devices, specifically in the case of car sensors.
This use case leverages a UDF that does real-time predictions using an autoencoder neural network to detect anomalies. You can apply the analytic model by executing the ksqlDB statement in which “anomaly” is the UDF:
You can also create a new ksqlDB stream to deploy the statement for continuous processing. The following example filters values over a specific threshold of five to send alerts if an anomaly occurs:
This ksqlDB stream
AnomalyDetectionWithFilter scales like any other ksqlDB or Kafka Streams application. It is built for S, M, L, X and XXL scenarios. If you need higher scalability or better throughput, simply create additional instances on ksqlDB servers.
If you want to try out this machine-learning UDF, just go to my GitHub project. It includes the source code and a step-by-step guide to test it using Confluent CLI and Confluent MQTT Proxy. The project also includes a MQTT sensor generator.
The story around building and deploying analytic models with ksqlDB is a very interesting and compelling use case worth discussing in further detail, but I’ll save that discussion for a future blog post that covers model training, deployment and inference in greater depth.
During development and testing, I faced some challenges when creating my first UDF with the new ksqlDB REST API. If your UDF does not work, check for the following:
ksql-udf.jar, which you get from a ksqlDB project build, or add it via Maven dependency (
groupId: io.confluent.ksq; artifactId: ksql-udf).
XYZ”? Did you restart the ksqlDB server after you added the uber JAR to the /ext directory? UDFs are only loaded during the startup of the ksqlDB engine. Or, perhaps you have the wrong path? Did you configure the right path in property
ksql-server.properties? Maybe start with an absolute path first. If that works, you can try a relative path.
ksql.log) but also other Kafka-related log files.
Please let us know if you face any other issues during development and testing of ksqlDB UDFs. All feedback is appreciated. While this blog post focused on building a stateless UDF, the development process is the same for a stateful UDAF.
The foundation is here. You’ve learned how easy it is to create and deploy a new UD(A)F for ksqlDB, involving only a few steps to implement, register and execute a ksqlDB UDF. Once the developer builds it, then every end user of your ksqlDB instances can leverage it whether they’re a data engineer or data scientist.
The example of building a deep-learning UDF shows how you can even include other dependencies, such as an analytic model, to build very powerful functions. The end user just has to call the function in the ksqlDB statement—no need to understand what is going on under the hood.