Confluent
How to Build a UDF and/or UDAF in KSQL 5.0
Confluent Platform

How to Build a UDF and/or UDAF in KSQL 5.0

Kai Waehner

KSQL is the open source streaming SQL engine that enables real-time data processing against Apache Kafka®. KSQL makes it easy to read, write and process streaming data in real time, at scale, using SQL-like semantics. KSQL already has plenty of available functions like SUBSTRING, STRINGTOTIMESTAMP or COUNT. Even so, many users need additional functions to process their data streams.

KSQL now has an official API for building your own functions. As of the release of Confluent Platform 5.0, KSQL supports creating user-defined scalar functions (UDF) and user-defined aggregate functions (UDAF). All it takes to write your own is just one Java class. The UD(A)F can embed any additional dependencies (like additional JARs or other binaries) if needed.

In this blog post, we’ll discuss the main need and motivation for UD(A)Fs in KSQL, show an advanced example with a deep-learning UDF, prove how easy it is to build even something that is very powerful and identify some issues that you might face during development and testing.

Motivation for KSQL UDFs/UDAFs

Let’s first discuss what UD(A)Fs are and why you might need to build your own ones.

Built-in KSQL functions to process streaming data

Functions are one of the key components in KSQL. Functions in KSQL are like functions, operations or methods in other SQL engines or programming languages, such as Java, Python or Go. They accept parameters, perform an action—such as a complex calculation—and return the result of that action as a value. They are used within a KSQL query to filter, transform or aggregate data.

Many functions are already built into KSQL, and more are added with every release. We distinguish them into two categories: stateless KSQL scalar functions and stateful KSQL aggregate functions.

But what do you do if you miss a specific function in the KSQL syntax? Pretty simple: You build your own one with the official KSQL API. Let’s see how easy it is to write your own stateless UDF or stateful UDAF.

UD(A)Fs in KSQL

In contrast to built-in functions, UD(A)Fs are functions provided by the user of a program or environment. A UD(A)F provides a mechanism for extending the functionality of the KSQL engine by adding a function that can be evaluated in a standard query language statement. Using a UD(A)F in KSQL looks exactly like using built-in functions in KSQL. Both are registered to the KSQL engine before startup. They expect input parameters and return output values.

Here is an example of the built-in function STRINGTOTIMESTAMP to convert a string value in the given format into the BIGINT value representing the timestamp:

This function takes the timestamp 2017-12-18 11:12:13.111 (data type: String) and converts it to 1513591933111 (data type: BigInt).

Now let’s take a look at the steps to build your own UDF to use it in your KSQL statements the same way as the STRINGTOTIMESTAMP function does above.

Deep-learning UDFs for anomaly detection

Building KSQL UDFs is easy, no matter if they are simple or powerful. With that in mind, let’s take a look at the steps to develop, test and deploy a new KSQL UDF.

Steps to create, deploy and test a KSQL UDF

You can build very simple KSQL functions like a specific scalar function for multiplication (see Confluent documentation to find out how), a really powerful function with many computations or aggregations, or anything in between these two extremities. Creating a stateful UDAF works the same way, of course.

The example below shows you how to build a more advanced UDF, which embeds an analytic model (i.e., an external dependency). This sounds complex, but in reality the implementation is simple. We write one Java class, add the dependency, write our business logic and build an uber JAR which includes the UDF and its dependency. This UDF can be deployed to the KSQL server(s) and then used in KSQL statements by end users without any coding in a programming language.

Java source code of the deep-learning UDF

Develop a UDF is quite easy, too. Just implement the function in a Java method within a Java class and add the corresponding Java annotations:

Here is the full source code for the anomaly detection KSQL UDF:

As you can see, you can really focus on implementing the business logic. In my example, I consume new events and do stateless processing by applying an analytic model. The prediction is returned immediately. Under the hood, KSQL receives events from a Kafka topic and sends the output to another Kafka topic.

The wrapper code for the UDF is minimal. You only need to add two annotations and the corresponding import statements to your POJO (plain old Java object):

  • Import io.confluent.ksql.function.udf.Udf and io.confluent.ksql.function.udf.Udf
  • @UDFDescription: class annotation to describe the name and functionality of the UDF
  • @UDF: method annotation to describe the functionality of the specific method (there can be more than one of these methods in a UDF class to process different interface signatures)

And that’s it. The annotations in this class make sure the KSQL server interprets this class correctly and loads the UDF into the runtime engine during startup.

Deployment to KSQL server(s)

The deployment of the UDF is pretty straightforward, no matter if you’re using a local installation on your laptop or a cluster of remote KSQL servers:

  • Build an uber JAR that includes the KSQL UDF and any dependencies. Typically, you use a build tool like Apache Maven™ or Gradle to do this. See the pom.xml of my GitHub project for an example using Maven.
  • Copy the uber JAR to the ext/ directory that is part of the KSQL distribution. The ext/ directory can be configured via the property ksql.extension.dir in KSQL configuration file ksql-server.properties. If this directory does not exist yet, simply create a new directory.
  • Start/restart your KSQL server instance(s) to pick up new UD(A)Fs.
  • Try out the new UDF in KSQL command line interface via Confluent REST Proxy or Confluent Control Center’s KSQL user interface, which has nice features like autocompletion.
  • If the UDF does not work, check the log’s errors. They will appear in the KSQL server log (ksql.log).

Working demo: MQTT sensor analytics with Apache Kafka and KSQL

Let’s now think about a practical example for a powerful UDF: continuous processing of millions of events from connected devices, specifically in the case of car sensors.Car sensors use case as a practical example for a powerful UDF

This use case leverages a UDF that does real-time predictions using an autoencoder neural network to detect anomalies. You can apply the analytic model by executing the KSQL statement in which “anomaly” is the UDF:

You can also create a new KSQL stream to deploy the statement for continuous processing. The following example filters values over a specific threshold of five to send alerts if an anomaly occurs:

This KSQL stream AnomalyDetectionWithFilter scales like any other KSQL or Kafka Streams application. It is built for S, M, L, X and XXL scenarios. If you need higher scalability or better throughput, simply create additional instances on KSQL servers.

If you want to try out this machine-learning UDF, just go to my GitHub project. It includes the source code and a step-by-step guide to test it using Confluent CLI and Confluent MQTT Proxy. The project also includes a MQTT sensor generator.

The story around building and deploying analytic models with KSQL is a very interesting and compelling use case worth discussing in further detail, but I’ll save that discussion for a future blog post that covers model training, deployment and inference in greater depth.

Potential issues during development and testing

During development and testing, I faced some challenges when creating my first UDF with the new KSQL REST API. If your UDF does not work, check for the following:

  • Compile errors? Did you include KSQL JAR into your project to use the classes and annotations? Either add ksql-udf.jar, which you get from a KSQL project build, or add it via Maven dependency (groupId: io.confluent.ksq; artifactId: ksql-udf).
  • Classpath errors at runtime? Are dependencies missing in the generated uber JAR? Extract the JAR and see if all needed dependencies are included (e.g., the analytic model in my example) and in the expected directory. Maybe your build process is not working well yet.
  • UDF not found at runtime? Do you see the error, “Can’t find any functions with the name XYZ”? Did you restart the KSQL server after you added the uber JAR to the /ext directory? UDFs are only loaded during the startup of the KSQL engine. Or, perhaps you have the wrong path? Did you configure the right path in property ksql.extension.dir in ksql-server.properties? Maybe start with an absolute path first. If that works, you can try a relative path.
  • Still experiencing problems? Check the logs—not just the KSQL log (ksql.log) but also other Kafka-related log files.

Please let us know if you face any other issues during development and testing of KSQL UDFs. All feedback is appreciated. While this blog post focused on building a stateless UDF, the development process is the same for a stateful UDAF.

What’s next for KSQL UD(A)Fs?

The foundation is here. You’ve learned how easy it is to create and deploy a new UD(A)F for KSQL, involving only a few steps to implement, register and execute a KSQL UDF. Once the developer builds it, then every end user of your KSQL instances can leverage it whether they’re a data engineer or data scientist.

The example of building a deep-learning UDF shows how you can even include other dependencies, such as an analytic model, to build very powerful functions. The end user just has to call the function in the KSQL statement—no need to understand what is going on under the hood.

The next step in the KSQL UD(A)F roadmap is to support building UD(A)Fs with programming languages other than Java. This way, even a data scientist who only understands Python or JavaScript can write and deploy their own functions. In addition to this, there are many new and exciting features to come, so be sure to keep an eye out for the next KSQL releases.

If you’re interested in what KSQL can do

Subscribe to the Confluent Blog

Subscribe

More Articles Like This

Team with minimum privileges
Gwen Shapira

Kafka Streams and KSQL with Minimum Privileges

Gwen Shapira .

The principle of least privilege dictates that each user and application will have the minimal privileges required to do their job. When applied to Apache Kafka® and its Streams API, ...

Using Apache Kafka to Drive Cutting-Edge Machine Learning
Kai Waehner

Using Apache Kafka to Drive Cutting-Edge Machine Learning

Kai Waehner .

Machine learning and the Apache Kafka® ecosystem are a great combination for training and deploying analytic models at scale. I had previously discussed potential use cases and architectures for machine ...

Kafka Connect Deep Dive – Converters and Serialization Explained
Robin Moffatt

Kafka Connect Deep Dive – Converters and Serialization Explained

Robin Moffatt .

Kafka Connect is part of Apache Kafka®, providing streaming integration between data stores and Kafka. For data engineers, it just requires JSON configuration files to use. There are connectors for ...

Leave a Reply

Your email address will not be published. Required fields are marked *

Comments

  1. Does KSQL UDF support to accept one or more input values and generate multiple output values? For example, parsing ip address into latitude and longitude. If yes, how to define the method signature, thanks!

    1. Hi Casel,

      yes, you can already do this. Even the “hello world example” from KSQL docs (https://docs.confluent.io/current/ksql/docs/udf.html) uses two input parameters:

      @Udf(description = “multiply two non-nullable INTs.”)
      public long multiply(final int v1, final int v2) {
      return v1 * v2;
      }

      For output, it is not that comfortable today, but you can use a single Array column (Java List). We will improve this in the future, but it is already doable this way.

      Hope this helps.

      Kai Waehner

Try Confluent Platform

Download Now

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.