Confluent
Building a Microservices Ecosystem with Kafka Streams and KSQL
Stream Processing

Building a Microservices Ecosystem with Kafka Streams and KSQL

Ben Stopford

Today we invariably operate in ecosystems: groups of applications and services which together work towards some higher level business goal. When we make these systems event-driven they come with a number of advantages. The first is the idea that we can rethink our services not simply as a mesh of remote requests and responses—where services call each other for information or tell each other what to do—but as a cascade of notifications, decoupling each event source from its consequences.

The second comes from the realisation that these events are themselves facts: a narrative that not only describes the evolution of your business over time, it also represents a dataset in its own right—your orders, your payments, your customers, or whatever they may be. We can store these facts in the very infrastructure we use to broadcast them, linking applications and services together with a central data-plane that holds shared datasets and keeps services in sync.

These insights alone can do much to improve speed and agility, but the real benefits of streaming platforms come when we embrace not only the messaging backbone but the stream processing API itself. Such streaming services do not hesitate, they do not stop. They focus on the now—reshaping, redirecting, and reforming it; branching substreams, recasting tables, rekeying to redistribute and joining streams back together again. So this is a model that embraces parallelism not through brute force, but instead by sensing the natural flow of the system and morphing it to its whim.

Life is a series of natural and spontaneous changes. Don’t resist them – that only creates sorrow. Let reality be reality. Let things flow naturally forward.

Lao-Tzu

So we can use the Kafka Streams API to piece together complex business systems as a collection of asynchronously executing, event-driven services. The differentiator here is the API itself, which is far richer than, say, the Kafka Producer or Consumer. It makes code more readable, provides reusable implementations of common patterns like joins, aggregates, and filters and wraps the whole ecosystem with a transparent level of correctness.

Systems built in this way, in the real world, come in a variety of guises. They can be fine grained and fast executing, completing in the context of an HTTP request, or complex and long-running, manipulating the stream of events that map a whole company’s business flow. This post focusses on the former, building up a real-world example of a simple order management system that executes within the context of a HTTP request, and is entirely built with Kafka Streams. Each service is a small function, with well-defined inputs and outputs. As we build this ecosystem up, we will encounter problems such as blending streams and tables, reading our own writes, and managing consistency in a distributed and autonomous environment. Here is a picture of where we will end up (just to whet your appetite):

Your System has State: So Let’s Deal with It

Building stateless services is widely considered a good idea. They can be scaled out, cookie-cutter-style, freed from the burdensome weight of loading data on startup. Webservers are a good example of this: to increase the capacity for generating dynamic content a web tier can be scaled horizontally, simply by adding new servers. So why would we want anything else? The rub is that most applications need state of some form, and this needs to live somewhere, so the system ends up bottlenecking on the data layer—often a database—sat at the other end of a network connection.

In distributed architectures like microservices, this problem is often more pronounced as data is spread throughout the entire estate. Each service becomes dependent on the worst case performance and liveness of all the services it connects to. Caching provides a respite from this, but caching has issues of its own: invalidation, consistency, not knowing what data isn’t cached, etc.

Streaming platforms come at this problem from a slightly different angle. Services can be stateless or stateful as they choose, but it’s the ability of the platform to manage statefulness—which means loading data into services—that really differentiates the approach. So why would you want to push data into your services? The answer is that it makes it easier to perform more data intensive operations efficiently (the approach was invented to solve ultra-high-throughput streaming problems after all).

To exemplify this point, imagine we have a user interface that allows users to browse Order, Payment, and Customer information in a scrollable grid. As the user can scroll through the items displayed, response time for each row needs to be snappy.

In a traditional, stateless model each row on the screen would require a call to all three services. This would be sluggish in practice, so caching would likely be added, along with some hand-crafted polling mechanism to keep the cache up to date.

But in the streaming approach, we can materialise the data locally via the API. We define a query for the data in our grid: “select * from orders, payments, customers where…” and the API executes it, stores it locally, keeps it up to date, and ensures it’s highly available should your service fail (this approach is discussed in more detail here).

To combat the challenges of being stateful, Kafka ships with a range of features to make the storage, movement, and retention of state practical: notably standby replicas and disk checkpoints to mitigate the need for complete rebuilds, and compacted topics to reduce the size of datasets that need to be moved.

So instead of pushing the data problem down a layer, stream processors are proudly stateful. They make data available wherever it is needed, throughout the ecosystem. This increases performance. It also increases autonomy. No remote calls needed!

Of course being stateful is always optional, and you’ll find that many services you build don’t require state. In the ecosystem we develop as part of this post; two are stateless and two are stateful, but the important point is that whether your services are stateless or stateful, a streaming platform provisions for both.

Coordination-Free by Design

One important implication of pushing data into many different services is we can’t manage consistency in the same way. We will have many copies of the same data embedded in different services which, if they were writable, could lead to collisions and inconsistency.

As this notion of ‘eventual consistency’ is often undesirable in business applications, the solution is to isolate consistency concerns (i.e. write operations) via the single writer principle. For example, the Orders Service would own how an Order evolves in time. Each downstream service then subscribes to the strongly ordered stream of events produced by this service, which they observe from their own temporal viewpoint.

This adds an important degree of ‘slack’ into the system, decoupling services from one another in time and making it easier for them to scale and evolve independently. We will walk through an example of how this works in practice later in the post (the Inventory Service), but first we need to look at the mechanics and tooling used to sew these ecosystems together.

Using Kafka Streams & KSQL to Build a Simple Email Service

Kafka Streams is the core API for stream processing on the JVM: Java, Scala, Clojure etc. It is based on a DSL (Domain Specific Language) that provides a declaratively-styled interface where streams can be joined, filtered, grouped or aggregated using the DSL itself. It also provides functionally-styled mechanisms — map, flatMap, transform, peek, etc.—for adding bespoke processing of messages one at a time. Importantly you can blend these two approaches together in the services you build, with the declarative interface providing a high level abstraction for SQL-like operations and the more functional methods adding the freedom to branch out into any arbitrary code you may wish to write.

But what if you’re not running on the JVM? In this case you’d use KSQL. KSQL provides a simple, interactive SQL interface for stream processing and can be run standalone, for example via the Sidecar Pattern. KSQL utilises the Kafka Streams API under the hood, meaning we can use it to do the same kind of declarative slicing and dicing we might do in JVM code using the Streams API. Then a native Kafka client, in whatever language our service is built in, can process the manipulated streams one message at a time. Whichever approach we take, these tools let us model business operations in an asynchronous, non-blocking, and coordination-free manner.

Let’s consider something concrete. Imagine we have a service that sends emails to platinum-level clients (this fits into the ecosystem at the top of the above system diagram). We can break this problem into two parts: firstly we prepare by joining a stream of Orders to a table of Customers and filtering for the ‘Platinum’ clients. Secondly we need code to construct and send the email itself. We would do the former in the DSL and the latter with a per-message function:

A more fully-fledged Email service can be found in the microservice code examples. We also extend this Transform/Process pattern later in the Inventory Service example.

This style of operation can be performed, off the JVM, using KSQL. The pattern is the same: the event stream is dissected with a declarative statement, then processed one record at a time. Here we implement the emailer in Node.js with KSQL running via the Sidecar Pattern:  

//Execute query in a sidecar 
ksql> CREATE STREAM orders (ORDERID string, ORDERTIME bigint...) WITH (kafka_topic='orders', value_format='JSON');
ksql> CREATE STREAM platinum_emails as select * from orders, customers where client_level == ‘PLATINUM’ and state == ‘CONFIRMED’;

The Tools of the Trade: Windows, Tables & State Stores

Before we develop more complex services let’s take a look at the stateful elements of the Kafka Streams API. Kafka Streams needs its own local storage for a few different reasons. The most obvious is for buffering, as unlike in a traditional database—where the data is laid out historically and can be ordered—in a streaming context events occur in real time, so there is less control over when, and in what order, they may turn up.

Let’s use a few variants on the email example. Imagine you want to send an email that confirms payment of a new order. We know that an Order and its corresponding Payment will turn up at around the same time, but we don’t know for sure which will come first or exactly how far apart they may be. We can put an upper limit on this though—let’s say an hour to be safe.

To avoid doing all of this buffering in memory, Kafka Streams implements disk-backed State Stores to overflow the buffered streams to disk (think of this as a disk-resident hashtable). So each stream is buffered in this State Store, keyed by its message key. Thus, regardless of which event turns up later, the corresponding event can be quickly retrieved.

Kafka Streams takes this same concept a step further to manage whole tables. Tables are a local manifestation of a complete topic—usually compacted—held in a state store by key. (You can also think of them as a stream with infinite retention.) In a services context such tables are often used for enrichment. Say we decide to include Customer information in our Email logic. We can’t easily use a stream-stream join as there is no specific correlation between a user creating an Order and a user updating their Customer Information—that’s to say that there is no logical upper limit on how far apart these events may be. So this style of operation requires a table: the whole stream of Customers, from offset 0, is replayed into the State Store inside the Kafka Streams API.

The nice thing about using a KTable is it behaves like a table in a database. So when we join a stream of Orders to a KTable of Customers, there is no need to worry about retention periods, windows or any other such complexity. If the customer record exists, the join will just work.

There are actually two types of table in Kafka Streams: KTables and Global KTables. With just one instance of a service running, these effectively behave the same. However if we scaled our service out—so it had four instances running in parallel—we’d see slightly different behaviours. This is because Global KTables are cloned: each service instance gets a complete copy of the entire table. Regular KTables are sharded: the dataset is spread over all service instances. So in short, Global KTables are easier to use, but they have scalability limits as they are cloned across machines, so use them for lookup tables (typically up to several gigabytes) that will fit easily on a machine’s local disk. Use KTables, and scale your services out, when the dataset is larger.

The final use of the State Store is to save information, just like we might write data to a regular database. This means we can save any information we wish and read it back again later, say after a restart. So we might expose an Admin interface to our Email Service which provides stats on emails that have been sent. We could store these stats in a state store and they’ll be saved locally, as well as being backed up to Kafka, inheriting all its durability guarantees.

Managing Streams, State and Consistency in the Inventory Service

Now we have a basic understanding of the tools used in stream processing, let’s look at how we can sew them together to solve a more complex and realistic use case—one which requires that reads and writes are performed consistently.

In the system design diagram, there is an Inventory Service. When a user makes a purchase—let’s say it’s an iPad—the Inventory Service makes sure there are enough iPads in stock for the order to be fulfilled. To do this a few things need to happen. The service needs to check how many iPads there are in the warehouse. It then needs to reserve one of those iPads until such time as the user completes their payment and the order is processed and shipped. This requires three actions be performed as a single, atomic transaction inside each service instance:

  1. Validate whether there is enough stock available (items in warehouse minus items reserved).
  2. Update the table of “reserved items” to reserve the iPad so no one else can take it.
  3. Send out a message that Validates the order.

You can find the code for this service here.

What is neat about this approach is its ability to scale. There is no remote locking, there are no remote reads, and we can scale out the inventory service linearly. What’s more, all operations are wrapped by Kafka’s transactional guarantees, meaning they either commit atomically or not at all, regardless of failures.

Bridging the Synchronous and Asynchronous Worlds

Finally, we can put all these ideas together in a more comprehensive ecosystem that validates and processes orders in response to a HTTP request, mapping the synchronous world of a standard REST interface to the asynchronous world of events, and back again. You can download and play with the code for this little ecosystem.

 

Looking at the Orders Service first, a REST interface provides methods to POST and GET Orders. Posting an Order creates an event in Kafka. This is picked up by three different validation engines (Fraud Check, Inventory Check, Order Details Check) which validate the order in parallel, emitting a PASS or FAIL based on whether each validation succeeds. The result of each validation is pushed through a separate topic, Order Validations, so that we retain the ‘single writer’ status of the Orders Service —> Orders Topic. The results of the various validation checks are aggregated back in the Order Service (Validation Aggregator) which then moves the order to a Validated or Failed state, based on the combined result.

To allow users to GET any order, the Orders Service creates a queryable materialized view (‘Orders View’ in the figure), using a state store in each instance of the service, so any Order can be requested historically. Note also that the Orders Service is partitioned over three nodes, so GET requests must be routed to the correct node to get a certain key. This is handled automatically using the Interactive Queries functionality in Kafka Streams. (We could also implement this view with an external database, via Kafka Connect.)

The linked code example also includes a blocking HTTP GET so that clients can read their own writes. In this way we bridge the synchronous, blocking paradigm of a Restful interface with the asynchronous, non-blocking processing performed server-side:

# Submit an order. Immediately retrieving it will block until validation completes.
$ curl -X POST ... --data {"id":"1"...} http://server:8081/orders/
$ curl -X GET http://server:8081/orders/validated/1?timeout=500

Finally, there are two validation services other than the Inventory Validation Service we discussed above. The Fraud Service tracks the total value of orders for each customer in a one-hour window, alerting if they go over the configured limit. This is implemented entirely using the Kafka Streams DSL, although it could be implemented in custom code via a Transformer etc., also. The Order Details Service validates the basic elements of the order itself. This is implemented with a producer/consumer pair, but could equally be implemented using Kafka Streams.

From the Micro to the Macro and Beyond

This example is about as fine-grained as streaming services get, but it is useful for demonstrating how a synchronous request-response paradigm can be bridged into an asynchronous, non-blocking one, and back again, via event collaboration. (We discussed the merits of event collaboration in an earlier post.)

But any distributed system comes with a baseline cost, both in complexity and in latency, so to really see the benefits of this type of system we need to grow and evolve the ecosystem further, adding services that do repricing, inventory management, fulfilment, shipping, billing etc., as well as increasing the functionality of the services we have. In these larger ecosystems the pluggability and extensibility that comes with an event-driven, brokered model increasingly pay dividends.

So the beauty of implementing services on a Streaming Platform lies in its ability to model both the micro and the macro with a single, ubiquitous workflow. Here, fine grained use cases merge into larger architectures that span departments, companies, and geographies.

Summing Up

When we build services using a Streaming Platform, some will be stateless: simple functions that take an input, perform a business operation and produce an output. Some will be stateful, but read only, as in when views need to be created so we can serve remote queries. Others will need to both read and write state, either entirely inside the Kafka ecosystem (and hence wrapped in Kafka’s transactional guarantees), or by calling out to other services or databases. Having all approaches available makes the Kafka’s Streams API a powerful tool for building event-driven services. 

But there are of course drawbacks to this approach. Whilst standby replicas, checkpoints, and compacted topics all mitigate the risks of pushing data to code, there is always a worst-case scenario where service-resident datasets must be rebuilt, and this should be considered as part of any system design. There is also a mindset shift that comes with the streaming model, one that is inherently asynchronous and adopts a more functional style, when compared to the more procedural style of service interfaces. But this is—in the opinion of this author—an investment worth making.

So hopefully the example described in this post helps to tackle some of these issues, particularly the bridge between synchronous and asynchronous worlds. We looked at how a small ecosystem can be built through the propagation of business events that describe the order management workflow. We saw how such services can be built on the JVM with Kafka’s Streams API, as well as off the JVM via KSQL. We also looked more closely at how to tackle trickier issues like consistency with writable state stores and change logs. Finally, we saw how simple functions, which are side-effect-free, can be composed into service ecosystems that operate as one. In the next post in this series, Bobby Calderwood will be taking this idea a step further as he makes a case for a more functional approach to microservices through some of the work done at Capital One.

Posts in this Series:

Part 1: The Data Dichotomy: Rethinking the Way We Treat Data and Services
Part 2: Build Services on a Backbone of Events
Part 3: Using Apache Kafka as a Scalable, Event-Driven Backbone for Service Architectures
Part 4: Chain Services with Exactly Once Guarantees
Part 5: Messaging as the Single Source of Truth
Part 6: Leveraging the Power of a Database Unbundled

Find Out More:

Getting started with KSQL
Kafka Streams Resources

Confluent Resources:

Apache Kafka® for Microservices: A Confluent Online Talk Series
Apache Kafka® for Microservices: Online Panel
Microservices for Apache Kafka white paper
Kafka Streams API Landing Page

Subscribe to the Confluent Blog

Subscribe
Email *

More Articles Like This

ksql-dashboard
Tom Underhill

Taking KSQL for a Spin Using Real-time Device Data

Tom Underhill . .

We are pleased this week to invite Tom Underhill to join us as a guest blogger. Tom is Head of R&D at Rittman Mead, a data and analytics company who ...

apache kafka
Neha Narkhede

Apache Kafka Goes 1.0

Neha Narkhede . .

It has been seven years since we first set out to create the distributed streaming platform we know now as Apache Kafka®. Born initially as a highly scalable messaging system, ...

Ian Duffy

Running Kafka Streams Applications in AWS

Ian Duffy . .

This guest blog post is the second in a series about the use of Apache Kafka’s Streams API by Zalando, Europe’s largest online fashion retailer. See Ranking Websites in Real-time ...

Leave a Reply

Your email address will not be published. Required fields are marked *

Comments

  1. Good stuff! One question about HTTP load balancer. If we have loadbalancer cluster with several nodes, it LB forwards request to Kafka topic and listens another topic.How can we guarantee that the same LB node will receive message from Kafka topic ? Because LB node that get HTTP request should return the HTTP response back to the client.

    1. Hi Aleksandr

      Thanks for the question. It’s a good one. The answer is you can’t, but this is handled by the code using the interactive queries feature that ships with Kafka.

      So your GET might be sent to node A, but the data you need is in a materialised view on node C. Kafka includes a discovery feature which allows the REST interface on node A to know that it needs to proxy the request to node C.

      The crucial method is the one getKeyLocationOrBlock() which is used to work out whether a request is local or remote: https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/main/java/io/confluent/examples/streams/microservices/OrdersService.java#L211

      Hope that helps

      B

      1. OK. But if I have one HTTP request to LoadBalancer node1-> LB node1 sends to Kafka->some service handled it and write result to Kafka topic->LB node1 consumes the result and sends it back to as HTTP response.

        I guess Kafka Streams is not responsible for it. I think LB node1 should send to Kafka queue the partition that it reads, then microservice can send the result to this particular partition.

        1. Yes – so what you say here is what would happen.
          PUT is Routed to OrderServceA Order(k1,V1-CREATED) -> OrdersTopic(P1) [assume key K1 maps to partition P1]
          Order is Validated -> Order(k1,V2-VALIDATED) which is sent to partition P1 and hence back to OrderServceA
          GET is say routed to OrderServceB. Is then rerouted to OrderServceA as it is the location of P1

  2. Awesome article! Another great installment in the series!

    Reading through this series and going through the sample code, I can’t help but think about how one would go about generalizing (read remove “plumbing” code for) the lookup of data in the distributed Materialized Views (as the Orders View in this post).
    I was thinking that to this end it would be helpful if an piece of code could continuously read the metadata of a Kafka Stream without actually participating in the processing of the stream. Such a capability would allow for example the creation of a sidecar that could lookup the materialized data no matter which node it’s present on.
    My question is whether such a capability (essentially reading the stream of metadata of a Kafka Stream) exists?

    Thanks

    1. Hi George

      Certainly you can create a consumer that reads the stream, or even just reads the headers, but I may not fully follow what you are suggesting. What metadata did you have in mind?

      B

  3. Hi Ben,
    thanks for the great article and example code, it is very enlightening!
    One question though: Your inventory service has the “orders” topic as its input. That means when you scale it out, each instance will handle a partition of order IDs (which, in the end, is more or less arbitrary).
    But orders from different partitions might refer to the same products; how do you maintain consistency and avoid lost updates on the “reserved stocks” store when two simultaneous orders place reservations on the same product? Is this logic somehow included in the join between Order and Product?
    Reading Martin Kleppmann’s “Designing Data Intensive Applications”, I was under the impression that in such a case consistency can only be achieved with a total ordering guarantee, by means of a separate “Product Claim” topic which is partitioned by product ID.

    1. That’s a great question Till, thanks for asking it.

      To ensure consistency we need to repartition the orders topic so that it is partitioned by the productId, not the orderId. That ensures that orders for the same product are always sent to the same node. Then we know the validation / reservation of the stock will be performed in order by a single thread.

      This happens in the selectKey step here: https://github.com/confluentinc/kafka-streams-examples/blob/1dcebedcc4a2e142160bb9a533db204390bb416f/src/main/java/io/confluent/examples/streams/microservices/InventoryService.java#L76

      Hope that helps, and again, great question.

      B

Try Confluent Platform

Download Now