Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Getting started with Kafka in node.js with the Confluent REST Proxy

Written By

Previously, I posted about the Kafka REST Proxy from Confluent, which provides easy access to a Kafka cluster from any language. That post focused on the motivation, low-level examples, and implementation of the REST Proxy. In this post, I want to follow up by showing how quickly you can create a wrapper for your favorite language and use that wrapper to build applications. With the REST Proxy, you only need a couple of hours to go from zero to comprehensive Kafka support for your preferred language.

Creating a REST Proxy wrapper library

I chose to build a node.js wrapper because I had seen requests for robust, fully-featured node.js clients.[1] There haven’t been good options for node.js clients for quite awhile, but many developers want to be able use Kafka from their node.js apps. Additionally, since most libraries focus specifically on producer and consumer support, they rarely expose all the functionality that the REST Proxy provides access to.

The design of the library is intentionally minimal and closely mirrors the REST API. Since node.js makes HTTP requests really easy, the implementation is straightforward. A small core client provides common request and response handling such as setting Content-Type, Accept, and User-Agent headers and parsing responses. It also ties together all the resource types: topics, partitions, consumers, and brokers. The resulting library is only a bit over 600 lines of non-blank non-comment code.

By mirroring the resources provided by the API, we get a pretty intuitive API with little effort. Here’s a quick example to demonstrate. First, we import the library and create an instances of the client working with a broker running on localhost:

var KafkaRest = require('kafka-rest'); var kafka = new KafkaRest({ 'url': 'http://localhost:8082' });

Next, we can access some resources. Listing all topic names is easy (omitting some error checking):

kafka.topics.list(function(err, topics) {
  for(var i = 0; i < topics.length; i++)
    console.log(topics[i].toString());
  })

We can also access the resources hierarchically, getting a specific topic-partition and then producing data to it:

kafka.topic(‘test’).partition(0).produce([‘event1’, ‘event2’],
  function(err, response) {
    console.log(util.inspect(response));
  }
);

Using consumers is also easy, but requires a bit more setup since they are stateful (again, omitting some error checking):

kafka.consumer("my-consumer-group-2").join({
    "format": "binary",
    "auto.offset.reset": "smallest"
  }, function(err, consumer_instance) {
    var stream = consumer_instance.subscribe('test');
    stream.on('read', function(msgs) {
        for(var i = 0; i < msgs.length; i++)
            console.log("Got a message: key=" + msgs[i].key + " value=" +
                        msgs[i].value + " partition=" + msgs[i].partition);
    });
});

 

I won’t dive into much of the implementation here since most of it is a simple wrapper using node’s standard built-in HTTP support with a few async helpers. However, I’ll highlight two steps I took to provide better integration with JavaScript and node.js.

The first example is how different types of produce requests are handled. This API requires careful design because it needs to support a lot of variations:

  • Message formats: keys, values, and partitions are optional
  • Single message vs. batch requests
  • Serialization formats: base64-encoded binary and Avro
  • For Avro, including full schemas vs. reusing schema IDs from previous requests
  • Might include callback to be notified of success or error

It would have been easy to generate different methods for many of these cases or require a large number of parameters (or a fixed order so omitted parameters default to undefined). However, I didn’t want the user to have to remember a half dozen method names and wanted common cases like sending a message with only a value to be as simple as possible. The API should be able to take advantage of how dynamic and flexible JavaScript is. Here’s a sample of the ways you can send messages, all via the same method:

// With binary data:

// Single message with a string value topic.produce('msg1');

// Single message with key, value, and partition, with callback topic.produce({'key': 'key1', 'value': 'msg1', 'partition': 0}, function(err,res){});

// Any record fields can be omitted topic.produce({'partition': 0, 'value': 'msg1'});

// Multiple messages containing only values topic.produce('msg1', 'msg2', 'msg3');

// Multiple messages containing only values, passed as array topic.produce(['msg1', 'msg2', 'msg3']);

// Multiple messages, mixed format topic.produce('msg1', {'partition': 0, 'value': 'msg2'});

// With Avro data:

var userIdSchema = new kafka.AvroSchema("int"); var userInfoSchema = new kafka.AvroSchema({    "name": "UserInfo",    "type": "record",    "fields": [        { "name": "id", "type": "int" },        { "name": "name", "type": "string" }] });

// Avro value schema followed by messages containing only values topic.produce(userInfoSchema, {'avro': 'record'}, {'avro': 'another record'});

// Avro key and value schema. topic.produce(userIdSchema, userInfoSchema, {'key': 1, 'value': {'id': 1, 'name': 'Bob'}});

Instead of mapping directly to the API, I spent some time getting all the rules for extracting the right elements from arbitrary arguments to keep the overhead for the user as low as possible. This is about 1/10 the total code footprint, but has a pretty big impact on usability.

The second example is the consumer API. Since node.js has a nice event API supporting stream interfaces, it makes sense to put a little more effort into wrapping the API so it feels native to node.js and hide some of the underlying HTTP requests from the user. To do this, instead of just exposing a consumer instance object with a method to make consume REST requests, the API exposes a ConsumerStream class which drives consumption, emitting an event for each message. As demonstrated above, this leads to a more node-like API.

Even with these integrations, there are a few improvements that could still be made:

  • The produce APIs support sending multiple messages in a single call, but don’t do any buffering internally. If you call the method with one message, it will make an HTTP request with only that one message. A more complete solution would implement something like the linger.ms setting in the Java clients to improve throughput.
  • The library doesn’t do anything to help users with schemas or data validation before sending requests. This is fine as a baseline since the REST Proxy will catch any errors, but it would be helpful to catch common errors before sending requests, and even better to provide integrated support for Avro schemas and objects.
  • The consumer API uses node.js’s EventEmitter, but could probably better integrate with the streams API by using the Readable interface.

Creating an application with the wrapper library

With the basic wrapper in place, I wanted to create a small but complete example to exercise the library. It’s pretty standard to provide console producer and console consumer implementations with Kafka client libraries and I also added a simple program to print out some simple cluster metadata. But something a little more realistic and interesting would help users of the library get started by providing a realistic application template to work with.

So I decided to create an application based on my go-to source of publically available streaming data: Twitter. Using the streaming API won’t return nearly as much data as even a small Kafka cluster can handle, but is more than enough for a small demo. One of the most useful features on Twitter to quickly discover what people are talking about is the list of trending topics. This application fits very well with Kafka’s data storage model: given a window of tweets, say the past 24 hours, we want to aggregate information about topics in tweets, condensing them to a top 10 list.

To accomplish this, we can break the application into two parts. The first will stream data from Twitter into Kafka, and the second will consume the tweets from Kafka, extracting key topics (we’ll simplify to just hashtags), aggregating by computing an exponentially weighted average, and then exposing this list as a service (again, we’ll simplify just by printing out the list periodically).

You can find the full source code for both here. Since much of the code handles configuration, command line arguments, and usage information, I’ll just extract a couple of key pieces here.

Streaming data from Twitter to Kafka

The first step is to prepare the Avro schema we’ll use for the topic. Although Twitter’s streaming APIs offer a lot of message types containing a lot more data, we’re only going to save two pieces of information: the tweet’s ID and the text of the tweet. In fact, for this application we only need the text, but this demonstrates how to use complex, structured data with the REST Proxy. The kafka-rest node library provides a small wrapper for Avro schemas that helps the library set HTTP headers correctly, so we just instantiate one of these objects with the Avro schema in JSON:

var schema = new KafkaRest.AvroSchema({
    "name": "TweetText",
    "type": "record",
    "fields": [
        { "name": "id", "type": "string" },
        { "name": "text", "type": "string" }
    ]
});

(Curious why IDs, which are 64-bit integers, have type string? See this documentation on IDs for details.)

Next, we configure the connections to the two systems. We need a REST proxy object targeting the topic on our target Kafka cluster:

var kafka = new KafkaRest({"url": api_url});
var target = kafka.topic(topicName);

Next we set up the connection to Twitter using the twitter node.js library. For this, you need to specify access keys and secrets. For this example we’ll use the API endpoint that returns a small sample of the firehose and restrict the output to English language tweets:

var twit = new twitter({
    consumer_key: consumer_key,
    consumer_secret: consumer_secret,
    access_token_key: access_key,
    access_token_secret: access_secret
});
var endpoint = 'statuses/sample';
var endpoint_opts = {'language': 'en'};
twit.stream(endpoint, endpoint_opts, function(stream) {
    stream.on('data', handleTweet);
    stream.on('error', function(err) {
        console.log(util.inspect(err));
        // We just shut down if we encounter an error, but we could also
        // setup retry logic to try to recover the stream
        shutdown();
    });
});

Finally, we implement the callback that handles each tweet. It extracts only the tweet information we want to save (ignoring changes to followers, blocking other users, etc) and adds it to a buffer of tweets. Once we’ve collected enough tweets, we send the entire buffer in one request. Mixed in with this, we’re tracking a few stats about how many tweets we’ve collected so far.

var handleTweet = function(data) {
    if (exiting) return;

   // Filter to only tweets since streaming data may contain other data // such as favorites, blocks, etc.    if (data.text === undefined)        return;

   // Extract just the ID and text.    var saved_data = {        'id': data.id_str,        'text': data.text    };    consumed.push(saved_data);    windowed_collected++;    // Send if we've hit our buffering limit.    if (consumed.length >= bufferMessages) {       messages_sent += consumed.length;        var responseHandler = handleProduceResponse.bind(undefined, consumed.length);        target.produce(schema, consumed, responseHandler);        consumed = [];    } }

There are a few other details we want to handle, such as handling errors if there is an problem producing the data to Kafka (handleProduceResponse in the code above). Take a look at the complete code for the full details.

We can test this without writing the consumer half of the application by using the Avro console consumer. Assuming you’ve already started Zookeeper, Kafka, Schema Registry and the REST Proxy locally on the default ports, start the consumer in one terminal so we’ll see the data as it is published to Kafka:

./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic tweets

Next, start streaming tweets. You’ll need to setup a Twitter app to get the necessary credentials. Then just pass them in, along with the name of the topic to use. (Run without arguments to see other parameters you can adjust.)

node examples/twitter/stream_tweets.js \
    --consumer-key <key> --consumer-secret <secret> \
    --access-key <key> --access-secret <secret> \
    --topic tweets --format avro

The node process should periodically report progress and the number of tweets per second it is generating. The consumer process should emit tweets as they are published. Note that this will not quite be continuous because messages are buffered before being sent to the REST Proxy.

Computing trending hashtags with a Kafka consumer

Now that we have the data in Kafka, we want to use it to compute trending hashtags. To do so, we’ll read the data out of Kafka, extract hashtags, and aggregate information about their frequency.

To consume data from Kafka we join a consumer group using a randomized name, then handle read events by extracting the tweet from each and passing it to a separate processTweet function:



var kafka = new KafkaRest({"url": api_url}); var consumer_instance;
var consumerGroup = "tweet-trending-consumer-" + Math.round(Math.random() * 100000);
kafka.consumer(consumerGroup).join({ "format" : “avro” }, function(err, ci) {
    if (err)
      return console.log("Failed to create instance in consumer group: " + err);
    consumer_instance = ci;
    var stream = consumer_instance.subscribe(topicName);
    stream.on('read', function(msgs) {
        for(var i = 0; i < msgs.length; i++) {
            var tweet = msgs[i].value;
            processTweet(tweet);
        }
    });
    stream.on('error', function(err) {
        console.log("Consumer instance reported an error: " + err);
        shutdown();
    });

   process.on('SIGINT', shutdown); })

The processTweet function handles each tweet by incrementing the weight for each hashtag found:

// Implements a simple exponentially weighted average scheme on
// lower-cased hashtags.
var hashtags = {};
function processTweet(tweet) {
    var words = tweet.text.toLowerCase().split(/\s/);

   // Filter to hash tags, increment weights    for(var i = 0; i < words.length; i++) {        var word = words[i];        if (word.length > 0 && word[0] == '#') {            if (hashtags[word] === undefined)                hashtags[word] = {'name': word, 'weight': 0};            hashtags[word].weight += 1;        }    } }

And finally, we periodically compute the top items and also multiply by a discount rate to maintain an exponentially weighted average that gives more recent terms higher weight:

// Setup period reporting, discounting of hashtag weights
var reportInterval = setInterval(function() {
    var sorted_terms = [];
    for(var hashtagKey in hashtags) {
        var hashtag = hashtags[hashtagKey];
        // Discounting won't affect sorting, so we can do this in the same pass
        hashtag.weight *= period_discount_rate;
        sorted_terms.push(hashtag);
    }
    sorted_terms.sort(function(a,b) {
        return (a.weight > b.weight ? -1 : (a.weight == b.weight ? 0 : 1));
    });

   for(var i = 0; i < Math.min(10, sorted_terms.length); i++) {        var line = "" + i + ". " + sorted_terms[i].name;        for(var s = 0; s < (50-sorted_terms[i].name.length); s++)            line += ' ';        line +=  "(" + sorted_terms[i].weight + ")";        console.log(line);    }    console.log(); }, report_period);

You can test this on the same data you’re collecting from Twitter by running the following command:

node examples/twitter/trending.js --topic tweets --format avro

If you’ve already been collecting data for awhile, add the --from-beginning flag to use all the data already in Kafka rather than only consuming newly reported tweets.

You should periodically see a list like the following, containing top hashtags and their weights, printed to the console:

0. #lohanthonyfollowme                               (54.957597456408585)
1. #raeesbhaicheabkibaar                             (32.961697578784836)
2. #fuelthelove                                      (24.456523950596782)
3. #jobs                                             (22.966354570291134)
4. #teenchoice                                       (21.51559373322098)
5. #job                                              (21.070613230503675)
6. #msgfilmdvdlaunched                               (19.114660226813516)
7. #indiegogo                                        (18.077552547592393)
8. #nowplaying                                       (14.448825623485286)
9. #1                                                (13.208667691399251)

Of course, since this is a node application, we could also easily expose this list as a simple API endpoint that applications could invoke to retrieve the top items. In just a few minutes we have created a new microservice for computing trending tweets.

Conclusion

With the REST Proxy I was able to design and create a library for Kafka in a matter of days and whip up a simple end-to-end application demonstrating it’s functionality. The library code is quite small but provides access to a lot of functionality. Accessing as much functionality in almost any language should be possible with about the same amount of code.

Even if there are existing client libraries for your language, you still might want a library for accessing the REST Proxy from your language of choice. In addition to normal client functionality, the REST Proxy provides access to cluster metadata, built-in support for common serialization formats, good integration with Confluent’s Schema Registry, and we’re planning to add admin operations in the future.

The application we built is simple, but surprisingly functional for so little code. It also has an important property because of the way we architected it around Kafka: the service providing trending hashtags is almost entirely decoupled from the service that imports tweets, in terms of performance, availability, and code. We could have implemented this in one application that performs all the steps together. But with the decoupled approach, the availability of data from the Twitter API only affects the timeliness of trending hashtags, not the ability of the service to compute and report those hashtags. If something goes wrong with the consumer process and it crashes, it is not a problem that its in-memory state is lost — it can just reconsume older data from Kafka and recover its state.

For simplicity, we had our import process filter the data down to just two fields, but in practice it would probably be better to try to preserve all the available information. That way we could have one service handle importing data from Twitter that many downstream consumers could use in different ways. This requires a bit more effort to define the full schema and means each application will use more bandwidth as it consumes the data, but also means we only need to import the data once and can use Kafka to fan out that data to many applications. And applications you may not have even thought of yet can easily use that data without any coordination with the import service.

One drawback of this particular implementation is that it does not currently take advantage of Kafka’s powerful consumer group abstraction to parallelize consumption. Because the application needs to compute a single, global aggregation of all tweets, we cannot completely parallelize the computation. One solution would be two break it down into two phases. The first phase would use multiple consumer instances in a consumer group to aggregate subsets of the data and report those statistics to another, smaller Kafka topics. Then the second phase would have one or more consumers read the partially aggregated statistics and combine them into global statistics. In the second phase, we could use multiple independent consumers to provide high availability and would only duplicate the small amount of work compiling the final global statistics.

This example was simple and a more realistic version would require much more intelligent processing of each tweet. However, the code to handle the interaction with Kafka and the REST Proxy would remain just as small. The fact that so little code is required to get up and running with the REST Proxy means you can focus on your application rather than worrying about implementing some of the complex details of regular Kafka clients.


 

[1] Recently, development of kafka-node has really picked up steam and seems to offer pretty complete producer and high-level consumer functionality. However, at the time it wasn’t as complete and up to date with recent versions of Kafka, and there were few other options for modern (i.e. Kafka 0.8+) node.js clients.

Did you like this blog post? Share it now