Level Up Your Kafka Skills in Just 5 Days | Join Season of Streaming
One of ksqlDB’s most powerful features is allowing users to build their own ksqlDB functions for processing real-time streams of data. These functions can be invoked on individual messages (user-defined functions or UDFs) or used to perform aggregations on groups of messages (user-defined aggregate functions or UDAFs).
The previous blog post How to Build a UDF and/or UDAF in ksqlDB 5.0 discussed some key steps for building and deploying a custom ksqlDB UDF/UDAF. Now with Confluent Platform 5.3.0, creating custom ksqlDB functions is even easier when you leverage Maven, a tool for building and managing dependencies in Java projects.
Confluent Platform 5.3.0 adds a new Maven archetype called the ksqlDB UDF / UDAF Quickstart that will allow you to quickly bootstrap your own UDF/UDAF without having to copy and paste example code, add the boilerplate for building an uber JAR, or perform other tedious tasks that would otherwise be required for setting up a new project. Maven archetypes are used to create project templates, so we found them to be a great vehicle for getting developers up and running quickly with custom ksqlDB functions.
In addition to discussing how the ksqlDB UDF / UDAF Quickstart can be used, we will also demonstrate how to convert the generated Maven project to a Gradle project with a simple command. Gradle is another automated build system that many developers prefer over Maven. These developers will learn how to bootstrap new UDF projects using the Maven archetype, and then convert to the build system of their choice for further development.
So, without further ado, let’s get started.
In order to use the ksqlDB UDF / UDAF Quickstart for bootstrapping a custom ksqlDB function, we need to have Maven installed. You can check to see if Maven is installed by running the following command:
$ mvn --version
If Maven is not installed, follow the official installation instructions. Next, add the Maven repositories from Confluent to your ~/.m2/settings.xml file:
<settings> <profiles> <profile> <id>myprofile</id> <repositories> <!-- Confluent releases --> <repository> <id>confluent</id> <url>https://packages.confluent.io/maven/</url> </repository>
<!-- further repository entries here --> </repositories> </profile> </profiles> <activeProfiles> <activeProfile>myprofile</activeProfile> </activeProfiles> </settings>
More information about Maven repositories can be found here. Once Maven is installed and the repositories have been added, generating a new UDF/UDAF project is simple. First, run the following command:
$ mvn archetype:generate -X \ -DarchetypeGroupId=io.confluent.ksql \ -DarchetypeArtifactId=ksql-udf-quickstart \ -DarchetypeVersion=5.3.0
In newer versions of Confluent Platform, 6.0.0 and later, the archetype ID has changed from ksql-udf-quickstart to ksqldb-udf-quickstart, so the archetype generate command becomes this:
mvn archetype:generate -X \ -DarchetypeGroupId=io.confluent.ksql \ -DarchetypeArtifactId=ksqldb-udf-quickstart \ -DarchetypeVersion=6.1.1
You will be asked to provide some information about your project. An example configuration is shown below (feel free to update the following with values that are appropriate for your own project):
Define value for property 'groupId': com.example.ksql.functions Define value for property 'artifactId': my-udf Define value for property 'version': 0.1.0 Define value for property 'package': com.example.ksql.functions Define value for property 'author': Mitch Seymour
Once you’ve confirmed the configuration (e.g., by simply hitting <ENTER> when prompted to do so), the above command will create a new project with the following directory structure. (Note: The actual directory structure may vary depending on the groupId and artifactId parameters that you specified earlier).
my-udf/ ├── dependency-reduced-pom.xml ├── pom.xml └── src ├── main │ ├── java │ │ └── com │ │ └── example │ │ └── ksql │ │ └── functions │ │ ├── ReverseUdf.java │ │ └── SummaryStatsUdaf.java │ └── resources └── test └── java └── com └── example └── ksql └── functions ├── ReverseUdfTests.java └── SummaryStatsUdafTests.java
In the next section, we will explore the example ksqlDB functions generated by the archetype and learn how to deploy these functions to our ksqlDB server.
The archetype includes one example UDF (REVERSE) and one example UDAF (SUMMARY_STATS), which are defined in the following files, respectively: ReverseUdf.java and SummaryStatsUdaf.java. Let’s start by taking a look at ReverseUdf.java.
package com.example.ksql.functions;import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter;
@UdfDescription( name = "reverse", description = "Example UDF that reverses an object", version = "0.1.0", author = "" ) public class ReverseUdf { @Udf(description = "Reverse a string") public String reverseString( @UdfParameter(value = "source", description = "the value to reverse") final String source ) { return new StringBuilder(source).reverse().toString(); }
@Udf(description = "Reverse an integer") public String reverseInt( @UdfParameter(value = "source", description = "the value to reverse") final Integer source ) { return new StringBuilder(source.toString()).reverse().toString(); }
@Udf(description = "Reverse a long") public String reverseLong( @UdfParameter(value = "source", description = "the value to reverse") final Long source ) { return new StringBuilder(source.toString()).reverse().toString(); }
@Udf(description = "Reverse a double") public String reverseDouble( @UdfParameter(value = "source", description = "the value to reverse") final Double source ) { return new StringBuilder(source.toString()).reverse().toString(); } }
This example UDF can be used for reversing strings and numerics, and it is already fully functional and ready to deploy. One key item this particular UDF showcases is the ability for a ksqlDB function to support multiple method signatures. Our REVERSE function (defined above) can reverse a String, Long, Integer, or Double since we provided methods for each of these operations. This example UDF is somewhat trivial, but the point of this archetype is to allow you to easily replace the code here with your own code, and then simply follow the build and deployment steps described later in this article to start using your own UDF.
As mentioned earlier, the archetype also includes an example UDAF. Unlike UDFs, which operate on a single row at a time, UDAFs can be used for computing aggregates against multiple rows of data. Let’s take a look at the example UDAF (called SUMMARY_STATS) and see how it works.
package com.example.ksql.functions;
import io.confluent.ksql.function.udaf.Udaf; import io.confluent.ksql.function.udaf.UdafDescription; import io.confluent.ksql.function.udaf.UdafFactory; import java.util.HashMap; import java.util.Map;
@UdafDescription( name = "summary_stats", description = "Example UDAF that computes some summary stats for a stream of doubles", version = "0.1.0", author = "" ) public final class SummaryStatsUdaf {
private SummaryStatsUdaf() { }
@UdafFactory(description = "compute summary stats for doubles") // Can be used with stream aggregations. The input of our aggregation will be doubles, // and the output will be a map public static Udaf<Double, Map<String, Double>> createUdaf() {
return new Udaf<Double, Map<String, Double>>() { /** * Specify an initial value for our aggregation * * @return the initial state of the aggregate. */ @Override public Map<String, Double> initialize() { final Map<String, Double> stats = new HashMap<>(); stats.put("mean", 0.0); stats.put("sample_size", 0.0); stats.put("sum", 0.0); return stats; } /** * Perform the aggregation whenever a new record appears in our stream. * * @param newValue the new value to add to the {@code aggregateValue}. * @param aggregateValue the current aggregate. * @return the new aggregate value. */ @Override public Map<String, Double> aggregate( final Double newValue, final Map<String, Double> aggregateValue ) { final Double sampleSize = 1.0 + aggregateValue .getOrDefault("sample_size", 0.0);
final Double sum = newValue + aggregateValue .getOrDefault("sum", 0.0);
// calculate the new aggregate aggregateValue.put("mean", sum / sampleSize); aggregateValue.put("sample_size", sampleSize); aggregateValue.put("sum", sum); return aggregateValue; }
/** * Called to merge two aggregates together. * * @param aggOne the first aggregate * @param aggTwo the second aggregate * @return the merged result */ @Override public Map<String, Double> merge( final Map<String, Double> aggOne, final Map<String, Double> aggTwo ) { final Double sampleSize = aggOne.getOrDefault("sample_size", 0.0) + aggTwo.getOrDefault("sample_size", 0.0); final Double sum = aggOne.getOrDefault("sum", 0.0) + aggTwo.getOrDefault("sum", 0.0);
// calculate the new aggregate final Map<String, Double> newAggregate = new HashMap<>(); newAggregate.put("mean", sum / sampleSize); newAggregate.put("sample_size", sampleSize); newAggregate.put("sum", sum); return newAggregate; } }; } }
This UDAF may seem complicated at first, but it’s really just performing some basic math and adding the computations to a Map object. Returning a Map is one method for returning multiple values from a ksqlDB function. Using the example above for your own UDAF, take note of the following methods:
Once you’ve replaced the example UDF/UDAF logic with your own (or, if you’d like, just use the example UDF/UDAF for the rest of this tutorial), then it’s time to deploy your ksqlDB functions to a ksqlDB server. To begin, build the project by running the following command in the project root directory:
$ mvn clean package
The archetype includes some default unit tests, so if you changed the example code by this point, then add the -DskipTests flag to the command above (we’ll cover tests in the next section, so we can skip them for now).
The above command will drop a JAR in the target/ directory. For example, if your artifactId is my-udf, then the command will have created a file named target/my-udf-0.1.0.jar.
Now, simply copy this JAR file to the ksqlDB extension directory (see the ksql.extension.dir property in the ksql-server.properties file) and restart your ksqlDB server so that it can pick up the new JAR containing your custom ksqlDB function.
# stop KSQL server cleanly using the following command $ /bin/ksql-server-stop
# restart the KSQL server so that we can use our newly deploy KSQL functions $ /bin/ksql-server-start config/ksql-server.properties
Restarting is not only required for ksqlDB to recognize new functions, but also to recognize any updates you have made to existing functions. Once ksqlDB has finished restarting and has connected to a running Apache Kafka® cluster, you can verify that the new functions exist by running the DESCRIBE FUNCTION command from the CLI:
ksql> DESCRIBE FUNCTION REVERSE ;
Name : REVERSE Author : Version : 0.1.0 Overview : Example UDF that reverses an object Type : scalar Jar : /tmp/ext/my-udf-0.1.0.jar Variations :
Variation : REVERSE(source INT) Returns : VARCHAR Description : Reverse an integer source : the value to reverse
Variation : REVERSE(source VARCHAR) Returns : VARCHAR Description : Reverse a string source : the value to reverse
Variation : REVERSE(source DOUBLE) Returns : VARCHAR Description : Reverse a double source : the value to reverse
Variation : REVERSE(source BIGINT) Returns : VARCHAR Description : Reverse a long source : the value to reverse
ksql> DESCRIBE FUNCTION SUMMARY_STATS ;
Name : SUMMARY_STATS Author : mitch Version : 0.1.0 Overview : Example UDAF that computes some summary stats for a stream of doubles Type : aggregate Jar : /tmp/ext/my-udf-0.1.0.jar Variations :
Variation : SUMMARY_STATS(DOUBLE) Returns : MAP<VARCHAR,DOUBLE> Description : compute summary stats for doubles
Finally, let’s invoke our new UDF/UDAF. For this example, we’ll assume there’s a topic named api_logs in our Kafka cluster. You can create this dummy topic by using the kafka-topics console script:
# assumes the `kafka-topics` script is on your $PATH $ kafka-topics --create \ --zookeeper localhost:2181 \ --topic api_logs \ --replication-factor 1 \ --partitions 4
Created topic "api_logs".
With the api_logs topic created, we can now create a ksqlDB STREAM using the following command:
ksql> CREATE STREAM api_logs (username VARCHAR, endpoint VARCHAR, response_code INT, response_time DOUBLE) \ WITH (kafka_topic='api_logs', value_format='JSON');
Message ---------------- Stream created ----------------
At this point, invoking our UDF/UDAF is simply a matter of adding it to our ksqlDB query:
ksql> SELECT username, REVERSE(username), endpoint, SUMMARY_STATS(response_time) \ FROM api_logs \ GROUP BY username, REVERSE(username), endpoint ;
The above command will execute a continuous query in the ksqlDB CLI. In another tab, we can produce some dummy records to the api_logs topics using the kafkacat utility.
$ echo '{"username": "mseymour", "endpoint": "index.html", "response_code": 200, "response_time": 400}' | kafkacat -q -b localhost:9092 -t api_logs -P $ echo '{"username": "mseymour", "endpoint": "index.html", "response_code": 200, "response_time": 900}' | kafkacat -q -b localhost:9092 -t api_logs -P
Back inside the ksqlDB CLI, you should see the following output:
mseymour | ruomyesm | index.html | {sample_size=1.0, mean=400.0, sum=400.0} mseymour | ruomyesm | index.html | {sample_size=2.0, mean=650.0, sum=1300.0}
In addition to including an example UDF and UDAF implementation, the ksqlDB UDF / UDAF Quickstart includes unit tests that demonstrate how to test your custom ksqlDB functions. These tests live in the src/test/java/ directory and rely on the JUnit 5 testing platform, which is automatically included when you create a project from the quick start. Whenever you update the example ksqlDB functions with your own code, it is necessary to also update the included unit tests.
Before we learn how to execute the tests, let’s first see what they look like. The first unit test we’ll review ensures the REVERSE UDF returns the expected results.
package com.example.ksql.functions;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource;
/** * Example class that demonstrates how to unit test UDFs. */ public class ReverseUdfTests {
@ParameterizedTest(name = "reverse({0})= {1}") @CsvSource({ "hello, olleh", "world, dlrow", }) void reverseString(final String source, final String expectedResult) { final ReverseUdf reverse = new ReverseUdf(); final String actualResult = reverse.reverseString(source); assertEquals(expectedResult, actualResult, source + " reversed should equal " + expectedResult); } }
As you can see in the code above, our testing methodology is relatively straightforward. First, we use a parameter provider called @CsvSource (included in the JUnit 5 testing library) to specify multiple test cases with their corresponding parameters and expected result values. The first value in each CSV string (hello and world) represents the parameter that we want to pass to our UDF (ReverseUdf). The second value in each CSV string represents the expected result of the test (since ReverseUdf is responsible for reversing objects, the expected result in this test case is a reversed string).
Now that we’ve defined our parameters, we simply instantiate a ReverseUdf instance, invoke the appropriate method for reversing a string (reverseString) with our test parameters, and check the result with assertEquals. This method of instantiating a ksqlDB function and invoking the appropriate methods directly in a test is a good way to prevent accidental regression as you iterate of your code in the future.
A keen eye may have noticed that our ReverseUdf is capable of reversing many types of objects, yet the included unit tests only cover the reversal of strings. We will leave the additional test implementations as an exercise for the reader.
Let’s move on to the unit tests for SummaryStats.
package com.example.ksql.functions;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.params.provider.Arguments.arguments;
import io.confluent.ksql.function.udaf.Udaf; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource;
import java.util.HashMap; import java.util.Map; import java.util.stream.Stream;
/** * Example class that demonstrates how to unit test UDAFs. */ public class SummaryStatsUdafTests {
@Test void mergeAggregates() { final Udaf<Double, Map<String, Double>> udaf = SummaryStatsUdaf.createUdaf(); final Map<String, Double> mergedAggregate = udaf.merge( // (sample_size, sum, mean) aggregate(3.0, 3300.0, 1100.0), aggregate(7.0, 6700.0, 957.143) ); final Map<String, Double> expectedResult = aggregate(10.0, 10000.0, 1000.0); assertEquals(expectedResult, mergedAggregate); }
@ParameterizedTest @MethodSource("aggSources") void calculateSummaryStats( final Double newValue, final Map<String, Double> currentAggregate, final Map<String, Double> expectedResult ) { final Udaf<Double, Map<String, Double>> udaf = SummaryStatsUdaf.createUdaf(); assertEquals(expectedResult, udaf.aggregate(newValue, currentAggregate)); }
// the rest of this file is omitted for brevity Stream<Arguments> aggSources() {}
}
The testing methodology for UDAFs is similar to UDFs. We instantiate our UDAF instance and call the appropriate methods (in this case, aggregate and merge) with a set of predefined parameters. We then check the output against the expected results. One minor difference between the ReverseUdf test we saw earlier and the SummaryStatsUdaf above, is that the latter uses a different mechanism for generating test parameters and outputs. Instead of using @CsvSource, we use the @MethodSource provider instead. This is a minor implementation detail and I encourage you to look at the example code yourself to see exactly how this works. The important takeaway here is that testing both UDFs and UDAFs is simple using the methods discussed above.
Finally, we’re ready to execute the unit tests. Simply run the following command to execute the test cases:
$ mvn test
If all goes well, you should see the following output:
[INFO] ------------------------------------------------------------------------ [INFO] [INFO] Results: [INFO] [INFO] Tests run: 5, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------
Bootstrapping your custom ksqlDB functions from Confluent’s Maven archetype doesn’t mean you also have to use Maven as your build tool. In fact, Gradle is often preferred, and converting your Maven project to a Gradle project is easy. Simply run the following command in the root project directory to generate a build.gradle file for your project:
$ gradle init --type pom
Now, feel free to delete the pom.xml and make all future build modifications to build.gradle instead.
Now that you know how to quickly bootstrap your next ksqlDB UDF/UDAF project, you can start building your own custom ksqlDB functions with minimal effort. A couple of next steps you may want to pursue include adding unit tests for your new code and, if your function might be useful to others, sharing it with the community.
For an in-depth look at custom ksqlDB functions, including UDFs that leverage embedded machine learning models, remote APIs, and more, you can check out my presentation from Kafka Summit London: The Exciting Frontier of Custom ksqlDB Functions.
Hojjat Jafarpour’s session from Kafka Summit San Francisco, UDF/UDAF: The Extensibility Framework for ksqlDB, may be of interest as well.
If you’re interested in more, you can:
Tableflow can seamlessly make your Kafka operational data available to your AWS analytics ecosystem with minimal effort, leveraging the capabilities of Confluent Tableflow and Amazon SageMaker Lakehouse.
Building a headless data architecture requires us to identify the work we’re already doing deep inside our data analytics plane, and shift it to the left. Learn the specifics in this blog.