Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Confluent’s Customer Zero: Building a Real-Time Alerting System With Confluent Cloud and Slack

Written By

We talk a lot about how customers can use Confluent as the data backbone for event streaming applications and enable a new class of event-driven microservices by completely decoupling services from one another. With Confluent, organizations can rapidly build and deploy business applications with greater flexibility, support larger scale, and be more responsive to customer demands. But we don’t just talk about it, we do it ourselves as Confluent’s “Customer Zero”!

One of the ways we leverage event-driven microservices within Confluent is to alert our internal Field Operations teams with timely, actionable insights and alerts.

Challenges with translating events into outcomes

Turning events into outcomes at scale is not easy! It starts with knowing what events are actually meaningful to your business or customer’s journey and capturing them. 

At Confluent, we have a good sense of what these critical events or moments are. They include events like creating a Confluent Cloud cluster for the first time, starting a new account or cloud organization, new releases, aggregated usage patterns and activity, customer health snapshots, and more transactional events such as a new opportunity or approaching the end of an existing agreement. This is just a small sample of events we know are important to Confluent’s business, and there are many more.

Once important events are identified, the next big challenge is to locate where those events originate and ensure we capture them. For the events above that we action, the system of origin for each event ranges from Confluent’s product backend, data warehouse, customer management systems, marketing tools, and more. We also want to enrich these events with more context that might not be available in the system of origin. For example, if a customer has just started a new Confluent Cloud cluster, it might be important to know if they have previously used clusters or perhaps are new users of Confluent Cloud. Unfortunately, in most cases, these critical data elements that would be used to enrich the event exist in other tools or systems. 

It’s critical that all of these events are up to date, which can prove challenging when many different systems are involved. We can all likely recall a time that we got an email or phone call about something that was awkwardly too late, or reached out to an Account Executive about an issue when they didn't even know anything had happened. For example, perhaps a customer has a connector fail on provisioning due to a bad configuration. If the account team is quickly notified in real time and has a working relationship with the customer already, they can reach out and perhaps help address the customer’s challenge before they get frustrated and potentially even give up.

In some cases, these actions are completely automated, such as an email with resources that goes directly to the customer. Confluent’s product delivers many real-time suggestions via email, in-product notifications, and other methods, but there are additional events that are best actioned by internal teams. 

The business challenge is often not in identifying what events are important or where they reside—in fact, that’s often the easy part—but rather taking that event or intelligence and using it to accelerate business objectives with some action. In many of these internal events at Confluent, these actions are centered around enabling our customers to be successful. In our case, the actions are typically a collection of recommended resources, shareable content, or activities which we believe will lead to the desired outcome. 

Identifying which events to alert internal teams about and what action to recommend could be a lengthy discussion and exploration in itself, but ultimately is unique to every business and desired outcomes.

Building a real-time alert service with Confluent Cloud

Commonly, these insights might be delivered as part of a series of batch data warehouse jobs, perhaps even several times per day. But a challenge remains: no matter how often those batches are run, if you’re not alerting on events in real time, you’re subject to actioning stale or no longer relevant data. Not to mention the technical debt associated with managing all those tightly coupled jobs and dependencies.

At Confluent, we leverage our data streaming platform as the core of our data ecosystem, so it becomes easy to alert in real time and drive timely actions, and allows us to completely decouple all of these components. With Confluent Cloud, we source events from several different systems using Confluent’s managed connectors (or Cluster Linking if they already reside in Apache Kafka® topics). We leverage stream processing tools like Kafka Streams to enrich these source events with contextual data, as well as filter and format alerts to be sent. Then, with a straightforward consumer microservice, we deliver the actions via a customer Slack application, where most of our account teams collaborate and communicate. With Confluent Cloud, we were able to get our alerts service into production in less than six months with allocating only one part-time resource. 

An example reference architecture for this use case is shown below, which we’ll dig into further in the next sections.

Capturing events in real time with Cluster Linking, connectors, and producers

The events and contextual data we need reside in several different systems, so we use several different ways to capture them. Luckily, Confluent makes this easy with Cluster Linking, over 120 pre-built connectors available in the Confluent Connector Hub, and the producer courses available at developer.confluent.io. 

Some of Confluent’s data is already available and exposed as part of Kafka topics, so we can easily capture these events using Cluster Linking. Cluster Linking on Confluent Cloud is a fully managed service for replicating data from one Confluent cluster to another. This allows us to easily tap into streams of data that have already been integrated to Confluent Cloud by other teams within Confluent. 

Confluent source connectors extract events from systems like Salesforce and publish each to Kafka topics. Connectors are no-code solutions that allow an organization to begin moving to a data-in-motion paradigm without changing an existing source application on day one. Over time, the strangler pattern can be used to migrate legacy source systems to a more modern, easily refactored architecture. Check out Confluent Docs for an example of configuring the SFDC Source Connector to see how easy using connectors can be. 

For systems where a native connector doesn’t exist, we generally build simple producers to send events to Confluent Cloud. A producer is a client application that publishes (writes) events to a Kafka cluster. In the following snippet, we show how to produce an event that corresponds to the start of onboarding for a customer. This event is one we create based on conditions within our data warehouse (Google BigQuery in this example). 

protected void produceOnSchedule() {
   log.info("Producing Onboarding Started events");
   try {
       TableResult results = BigQueryService.getBqResult(
//CREATING EVENTS FROM BIG QUERY DATASETS – TRUNCATED FOR LENGTH
);
//for each record, produce an event to Confluent Cloud
    for (FieldValueList row : results.iterateAll()) {
           OnboardingEvent record = new OnboardingEvent("onboarding-started", row);
           produceRecord(record.getSfdcAcctId(), record);
       }
   } catch (Exception e) {
       e.printStackTrace();
   }
}

Note that several aspects of the full producer code are not included here, like defining our Kafka producer or sending the message. For more complete examples of producers, check out Confluent Developer

Enriching events with contextual data

Once the changes are published to Kafka, we can represent each as an abstraction using Kafka Streams and enrich the events by joining other streams to create a new topic. Alternatively, this part of the solution could use Confluent's fully managed Apache Flink offering to perform the joins and filtering. Given we’ve been running this solution for a few years, we currently leverage Kafka Streams, but intend to move some pipelines to Flink as appropriate. 

Let’s take a look at one specific example of Streams DSL that is enriching an event for when a connector fails during provisioning. When this event occurs, an alert is sent to a member of the account team with some context and suggested activities. 

Note: Throughout the following example, we have truncated sections, such as SerDe and variable definitions. It is purely illustrative and is not “runnable” code.

First, we’ll define our KStream and KTables that we’ll use. All connector failure events are produced to the "data-notifications-pipeline" topic, which is the primary/base KStream, notificationsPipelineEventKStream in our streaming application. We then have a GlobalKTable, csdrAssignmentKTable, that contains our account team mapping which is stored in the “data-csdr-assignment” topic. Last, we have another GlobalKTable, slackUsersTable, which contains the User ID information for various Slack users. 

//Define streams & tables to be used
KStream<String,NotificationsPipelineEvent> notificationsPipelineEventKStream = builder.stream("data-notifications-pipeline", Consumed.with(Serdes.String(),notificationsPipelineEventSerde));

GlobalKTable<String, CSDRAssignment> csdrAssignmentKTable = builder.globalTable("data-csdr-assignment",Consumed.with(Serdes.String(),csdrAssignmentSerde));

GlobalKTable<String, SlackUser> slackUsersTable = builder.globalTable("data-slack-users",Consumed.with(Serdes.String(),slackUserSerde));

Now that we’ve defined our KStream and KTables, we can enrich the source events by filtering out events that are not provisioning failures, adding the account team member to alert, their Slack User ID, and the message formatted for use with the Slack messaging API. Let’s break this down into smaller chunks: 

First, we’ll start by filtering the source events (which contains all connector failure change events) to only keep provisioning failure events. Because each message already represents a connector failure, we want to look inside the message value and specifically check for an old status of “Provisioning” and keep just these messages. 

notificationsPipelineEventKStream
       .filter((key,value) -> {JSONObject dataJson = new JSONObject(value.getData()); return dataJson.has("labels") && dataJson.getJSONObject("labels").get("connector_old_status").toString().equals("Provisioning");})

Next, we’ll join to the assignments table to determine who the right person is to notify about the provisioning failure (if anyone). This join uses the key in the source event (leftKey below) to join with the key on the GlobalKTable. In this case, the values are joining on Org ID, which is the key of the assignments topic. 

.join(csdrAssignmentKTable,
               (leftKey,leftValue) -> leftKey,
               (leftValue,rightValue) -> new ConnectorFailureEvent(leftValue,rightValue))

Now that we know who to alert from the first join, we need to know where to send the message in Slack, which requires a user’s Slack ID. We join, but this time using a new join key, which is the email address of the account team member, which we just got from the last join.       

.leftJoin(slackUsersTable,
               (leftKey,leftValue) -> leftValue.getCsdrEmail(),
               (leftValue,rightValue) -> new ConnectorFailureEvent(leftValue,rightValue))

After we know who and what we’re going to alert, we need to know the contents of the message. To simplify the amount of work on our consumer microservice which works with Slack’s API, we will produce messages that are already formatted as blocks. 

.mapValues(
               value -> {
                   //VARIABLE DEFINITIONS REMOVED FOR EXAMPLE
                   String alertId = orgId + "-" + envId + "-" + connId + "-connector-failure";
                   final SlackMessageAvro slackMessage = new SlackMessageAvro();
                   slackMessage.put("alertId", alertId);
                   slackMessage.put("channelId",csdrOwnerId);
                   slackMessage.put("text","Connector Failure for Org " + orgId + "!");
                   slackMessage.put("blocks",
//SLACK FORMATTED BLOCKS TRUNCATED FOR LENGTH
);
                   return slackMessage;
               }
       )

And finally, we send the formatted alert to a final topic called “slack-messages” where it will be consumed by a microservice and converted into a Slack message for the alert recipient. 

       .to("slack-messages", Produced.with(Serdes.String(), slackMessageAvroSerde));

So, putting all of those together, we get the following:

//Define streams & tables to be used
KStream<String,NotificationsPipelineEvent> notificationsPipelineEventKStream = builder.stream("data-notifications-pipeline", Consumed.with(Serdes.String(),notificationsPipelineEventSerde));

GlobalKTable<String, CSDRAssignment> csdrAssignmentKTable = builder.globalTable("data-csdr-assignment",Consumed.with(Serdes.String(), csdrAssignmentSerde));

//Filter for provisioning events, join for account team and slack user info, then format for Slack
notificationsPipelineEventKStream
       .filter((key,value) -> {JSONObject dataJson = new JSONObject(value.getData()); return dataJson.has("labels") && dataJson.getJSONObject("labels").get("connector_old_status").toString().equals("Provisioning");})
       .join(csdrAssignmentKTable,
               (leftKey,leftValue) -> leftKey,
               (leftValue,rightValue) -> new ConnectorFailureEvent(leftValue,rightValue))
       .leftJoin(slackUsersTable,
               (leftKey,leftValue) -> leftValue.getCsdrEmail(),
               (leftValue,rightValue) -> new ConnectorFailureEvent(leftValue,rightValue))
       .mapValues(
               value -> {
                   //VARIABLE DEFINITIONS REMOVED FOR EXAMPLE
                   String alertId = orgId + "-" + envId + "-" + connId + "-connector-failure";
                   final SlackMessageAvro slackMessage = new SlackMessageAvro();
                   slackMessage.put("alertId", alertId);
                   slackMessage.put("channelId",csdrOwnerId);
                   slackMessage.put("text","Connector Failure for Org " + orgId + "!");
                   slackMessage.put("blocks",
//SLACK FORMATTED BLOCKS TRUNCATED FOR LENGTH
);
                   return slackMessage;
               }
       )
       .to("slack-messages", Produced.with(Serdes.String(), slackMessageAvroSerde));

This is just one small example of how we enrich our real-time events and alerts within Confluent to notify the field. Other event pipelines follow a similar pattern within Kafka Streams, including the following: 

  1. Defining SerDes to be used 

  2. Defining KStreams and KTables from topics

  3. Performing stateless and stateful transformations

  4. Writing back to a topic

Notifying account teams with a microservice consumer

Having captured the critical events and enriched them, it is time to alert the account team so they can take action. To do this, we created a simple microservice that consumes from a Confluent Cloud topic called slack-messages and makes API calls to Slack to send the message. 

In the example below, we use a Spring Kafka consumer to consume events from slack-messages and post to Slack’s API. We produce the message responses back to Kafka for responses or events that could be useful or important to us in the future. 

@KafkaListener(topics = "slack-messages")
   public void listenWithHeaders(
           @Payload SlackMessageAvro message,
           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
           @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) throws Exception {

       Slack slack = Slack.getInstance();
       String token = configuration.getSlackToken();

       ChatPostMessageResponse response = slack.methods(token).chatPostMessage(req -> req
                       .channel(message.getChannelId().toString())
                       .blocksAsString(message.getBlocks().toString())
                       .text(message.getText().toString())
       );

       DateTimeFormatter inputFormat = DateTimeFormatter.ofPattern("[EEE, dd MMM yyyy HH:mm:ss zzz]");
       LocalDateTime dateTime = LocalDateTime.parse(response.getHttpResponseHeaders().get("date").get(0), inputFormat);

       if (response.isOk()){
           //HANDLE RESPONSE - TRUNCATED FOR LENGTH
       }
       else {
           System.out.println(response);
       }
   }

What to do next?

It’s easy to imagine how quickly additional use cases and value can be created after an initial streaming data pipeline is created. We can quickly iterate and repeat the formula of identifying critical events, getting them into Confluent Cloud (with Cluster Linking, fully managed connectors, and/or producers), enriching them with contextual data (using Kafka Streams), and notifying team members (with a custom microservice). With every new use case, we further develop and enhance a scalable go-to-market model that equips our Field Operations team members with the tools they need to be successful.

If you'd like to try out Confluent Cloud, you can get started with a free trial to get $400 of free usage for your first 30 days.

  • Matt Mangia is a Staff Solutions Engineer on the Partner Innovation & Ecosystem team. Before focusing on scaling Partner and Field Operations programs, he served as a Solutions Architect at Confluent helping design and implement data in motion use cases with customers across North America. Matt lives in Columbus, Ohio with his wife, Kate, and two children, Tony and Joanna. He enjoys DIY projects, college football, and swimming.

Did you like this blog post? Share it now