Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Uniting the Machine Learning and Data Streaming Ecosystems - Part 2

Written By

In part one of this series, we explored the business opportunities of uniting the machine learning (ML) and data streaming markets and explored the socio-technical incentives that stand between them. In this post, we will explore the opportunities and pitfalls of SQL as an ecosystem bridge, look at solutions to interface the JVM and Python interpreter, and finally, dive into code examples that show you how to build your own streaming machine learning solution today.

SQL – A lingua franca?

Part one of this series demonstrated how the language barrier between the Java and Python ecosystems holds back machine learning for data in motion. But both these communities do share a common tongue: SQL.

SQL is a common language across all programming communities and is versatile enough for both transactional and analytic queries. Processing data in motion requires different semantics than data at rest, so streaming SQL solutions such as ksqlDB and Flink SQL have been built.

SQL's syntax is excellent for building a directed acyclic graph (DAG), which the underlying SQL engine can optimize for execution. One of the fundamental principles of Modern Data Flow is to be declarative, and to "build data transformations by describing what you need to do and let the infrastructure figure out the right way to do it." Building DAGs enables declarative programming.

We need to move from specialists to generalists. Imperative to declarative.

— Zhamak Dehghani, CEO and founder, Nextdata

But you can build DAGs without SQL. Declarative programming does not mean using SQL or configuration languages. Lee Briggs from Pulumi gave a great talk on this misconception at Current 2022, explaining how Pulumi provides an imperative authoring experience via TypeScript, Python, Go, C#, or Java while still creating a declarative DAG. Briggs also explains in his blog post, "What the imperative authoring experience gives you is the ability to manipulate how the DAG is built. You're still building a graph. You're not checking if the operation was successful or unsuccessful because that would mean the tool is imperative. Imperative languages just give you the flexibility you need to manipulate the way the graph is built.”

Building a DAG is precisely what Kafka Streams does for Java and Faust for Python, which is why they're so powerful compared to the standard Apache Kafka® Clients. As outlined in our Guide to Kafka Streams: "When you define a Kafka Streams operation on an event stream or streams, what you're really defining is a processor topology, a directed acyclic graph (DAG) with processing nodes and edges that represent the flow of the stream."

Additionally, all stream processing solutions provide a means of tying in imperative code. Kafka Streams and Faust are libraries that easily connect with any other Java or Python code, and the SQL-based solutions provide UDFs. It's unfeasible to build full stream processing apps using only streaming SQL, just as you can't make a full back-end microservice in only standard SQL.

Feature pipelines can be complex. We need a language that allows building small pieces of logic independently and stitching them together, while making the pieces reusable.

— Chip Huyen, Co-Founder, Claypot AI

The other major challenge of SQL is its lack of composability, which is essential for complex analytic workloads. This is part of why libraries such as pandas are so popular, because data can be pulled into a DataFrame via SQL, and then Python logic can be reused with procedural, functional, and even object-oriented patterns.

Standard SQL and Flink SQL enable the reuse of whole queries via common table expressions (CTEs) but lack intra-query composability. Also, they do not support splitting queries across multiple files. Large SQL analysis becomes like a large imperative script with the same advice for maintainability: indent your code, use CTEs (fixed functions), and add comments. This is fine for small transactional queries, but we have moved well beyond these approaches for more complex systems.

The real benefit of SQL is having an expressive data-oriented language shared across all developer communities, which is familiar to data analysts without software engineering experience. This makes the power of streaming available to more personas.

Streaming SQL has an important role, and Confluent continues to invest heavily in its future. But its place is complementary to, not as a replacement for, general-purpose programming languages. So despite its benefits, SQL is only part of the solution to the ML streaming problem.

Breaking down language barriers

To truly bring together the data streaming community and the machine learning community, we have a few choices:

  1. Rewrite the Python ML ecosystem for the JVM and convince all Python engineers to retool to use a JVM-based language.

  2. Rewrite the whole data streaming ecosystem in a bridge language like C and write Java and Python bindings.

  3. Rewrite libraries in Python or a bridge language and write bridge libraries (bridged via C or IPC) for JVM platforms.

The first two options are unfeasible (but often suggested when this topic comes up), so we'll dig into option number 3.

For building Python libraries, multiple tactics have been taken. The Confluent Clients team has provided C/C++, C#/.NET, Python, and Go support for Kafka by rewriting the Kafka Java client in C and creating higher-level language bindings. Faust (more info on this later) has taken the approach of being a language-native rewrite from top to bottom. Confluent is currently experimenting with a third approach for Parallel Consumer by writing Python bindings on top of the Java library.

Apache Flink®, Apache Beam®, and Apache Spark™ are all JVM-based platforms with multi-language support (including Python), and all are used widely in production. As a result, architectural patterns for breaking down language barriers are increasingly robust and battle-tested.

PyFlink is the missing piece for an ML-powered data streaming infrastructure, as almost every data engineer uses Python.

— Kai Waehner, Field CTO, Confluent

PyFlink and PySpark use the Py4J library to communicate with a JVM server over sockets to coordinate the building of a DAG. This keeps Python as a client-side interface for coordinating an optimized server-side JVM-based platform.

For executing Python as part of the DAG, Apache Beam and Apache Flink's process mode spin up Docker containers and communicate via gRPC. Flink additionally supports a thread mode which uses the PemJa library to enable the JVM and the Python interpreter to communicate via C.

Spark's support for Pandas UDFs bypasses Python, except as a coordinator. Pandas is written in C and supports reading directly from the Apache Arrow format, held in memory. Spark writes the data to an in-memory buffer in Arrow format and triggers Pandas to read the data, bypassing Python.

These approaches could be deployed to make additional libraries and platforms accessible such as Kafka Streams and Kafka Connect.

Real-time experiment management

ML experiment management platforms are core to MLOps best practices. These platforms ingest your data, train your weights, and export your production-ready model.

There are currently no managed direct Kafka integrations for any ML experiment management platform. DataRobot comes the closest, having an SDK that leverages Kafka, but it still requires you to run your own monitoring agent. Amazon SageMaker Feature Store's streaming ingestion demo suggests sinking to an AWS Lambda function hosting your own code that leverages their feature store client API. Pachyderm supports spout pipelines for ingesting streaming data and has an example that integrates via a Kafka Golang client library. Arize AI recommends integration by running a service utilizing the Python Kafka Client and the Arize Pandas SDK. This blog post showcases using Airflow to pull daily data from Kafka to update an ML model, with data logged in MLflow.

This underdevelopment is unfortunate as Kafka and experiment management platforms complement each other beautifully. There are use cases such as sinking data to these platforms to monitor distribution shifts and sourcing data to trigger MLOps actions such as automated deployments of models. Luckily, there is a thriving open source Kafka community, and the Kafka Connect API provides an easy way to create fault-tolerant integrations, so you may find one in the wild or build one yourself.

Popular ML experiment management platforms include: 

Build a streaming ML solution

Python client

You can build a real-time ML-powered microservice using Confluent Python Client and all of Python's ML ecosystem. To get experience with this library, check out the Apache Kafka for Python Developerscourse. 

Centrica Hive uses this for an ML-powered production use case, interlacing data processing pipelines built with Java and Kafka Streams with ML-powered Python apps using a Kafka Python client.

Example code:

from confluent_kafka import Consumer, Producer
from my_app import ml_model

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {err}')
    else:
        print('Message delivered to {msg.topic()} [{msg.partition()}]')

p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})
c = Consumer({
    'bootstrap.servers': 'mybroker',
    'http://group.id ': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['mytopic'])

while True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    value = msg.value().decode('utf-8')
    print(f'Received message: {value}')
    result = ml_model(value).encode('utf-8')
    print(f'Produced result: {result}')
    p.poll(0)
    p.produce('mytopic', result, callback=delivery_report)

c.close()

Faust

Kafka Streams is a Java library which enables the building of powerful stream processing apps. It maintains computed states materialized off the stream, which can be queried directly or piped into external systems. This doesn't involve querying Kafka directly but instead makes it so that stream processing applications can keep a derived, queryable, materialized view of the data in Kafka and run low-latency queries against it. The Kafka cluster stores the log, and the stream processing API stores the materialized view and processes queries against it.

Faust is a community maintained, open source project that brings functionality equivalent to Kafka Streams to Python. This enables Python developers to build powerful stream processing apps.

Alex Morley from Babylon gave a great talk at Current 2022 about their experience building stream processing apps to drive ML-powered health risk prediction and real-time early detection alerts. For example, on receiving a new blood pressure reading, their event-driven architecture updates all the patient’s risks, including cardiovascular risk, for the next 10 days. This solution requires complex logic, Python libraries, API requests, and internal state. Morley outlined how SQL-based solutions, existing FaaS solutions, and Flink did not meet their needs. They used Faust to build a Function as a Service API with excellent results.

We massively increased the number of developers using the data streams available at Babylon and reduced the time to deployment for a new stream processing app by an order of magnitude.

— Alex Morley, Engineering Manager, Babylon 

Example code:

from faust import App, Record
from datetime import datetime
from my_app import ml_model

class InputFeatures(Record, isodates=True):
    session_id: str
    value: int
    date_published: datetime

app = App('example', broker='kafka://localhost:9092')
in_topic = app.topic('input', value_type=InputFeatures)
out_topic = app.topic('output')

@app.agent(in_topic, sink=[out_topic])
async def infer(stream):
    async for features in stream.group_by(InputFeatures.session_id):
        result = ml_model(features)
        yield result

Flink

Flink supports Python UDFs with access to the complete Python ecosystem. Flink stands apart from the other solutions by supporting both batch and stream processing. It has sophisticated state management and is designed for high availability.

Example code:

from pyflink.table.udf import udf

class Predict(ScalarFunction):
    def open(self, function_context):
        import pickle
        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)

predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")

Java

This blog post showcases using Kafka Streams to preprocess data before calling a model hosted in a separate model-server process via gRPC and using the Java native version of TensorFlow from a Kafka Streams app or ksqlDB UDF.

This blog post showcases using Kafka Streams for online machine learning to predict flight arrivals using a Java-native ML model.

Online machine learning

River lets you take your ML modeling to the next level, fully capitalizing on the streaming paradigm. It is an online learning Python library, enabling you to train models incrementally on the stream rather than in batch. At Current 2022, Zander Matheson from Bytewax showcased using River in a stream processing app to detect fires in real time using air quality data.

You can also try online learning with SymetryML, a streaming machine learning toolkit which connects to Kafka. Or check out Kafka-ML, a platform that uses Kafka and Kubernetes to train, deploy, and monitor deep learning models on data streams, which supports distributed and incrementally trained models.

River’s README highlights when to use online ML:

“An online approach might fit the bill if:

  • You want a model that can learn from new data without having to revisit past data.

  • You want a model which is robust to concept drift.

  • You want to develop your model in a way that is closer to what occurs in a production context, which is usually event-based.”

Further reading

Apache Kafka and R: Real-Time Prediction and Model (Re)training – using R, Kotlin, ksqlDB and MongoDB

Kafka in Machine Learning for Real-time Predictions – using Python client, Confluent Cloud, and Scikit-Learn

Conclusion

Neither the machine learning nor the data streaming markets are showing signs of slowing down, and they will continue to accelerate, innovate, and soon tightly integrate. We know the solutions, and the community has shown it's ready to execute. Are you building machine learning solutions with Confluent? Come tell us about it on the Community Forum and Slack! Share your Python solutions/questions on the Slack #python channel.

  • Robbie Palmer is a machine learning engineer who is passionate about overcoming the socio-technical challenges of the ML market. He has led and bridged several data science and software engineering teams, solving repeated socio-technical patterns. He has built geospatial-derived computer vision solutions, from data collection to full-stack app development, powered by deep learning models and distributed computing architectures.

Did you like this blog post? Share it now