[Webinar] Don’t Get Left Behind: Unlock the Secrets of Shifting Left | Register Now
One of the great things about KSQL is how you can filter large volumes of data in real time to just identify events of particular importance. With these events, we often want to do something, like drive a process, take an action, or tell someone. Considering the latter of these—notifying someone that something’s happened—there are various ways of doing this. Most people have a mobile phone, and so a push notification is a common mechanism.
Slack is a messaging application, common in many workplaces. Think of it as IRC, but with animated GIFs. It’s free to get started with, has an API, and it supports push notifications—making it an ideal candidate for this prototype push-notification application that we’re going to build. I’m going to use the Slack Python client that Slack provides, in conjunction with the Confluent Kafka Python client that’s available standalone or bundled with Confluent Platform. We’ll subscribe to a Kafka topic, and based on messages received send a notification through Slack. The use-case will be notifying a user when suspicious login activities are detected in the syslog.
As we saw in part 1 of this series, the source data is from numerous servers and devices, all sending their syslog data to Kafka Connect running the kafka-connect-syslog plugin. This streams the data into a Kafka topic:
ksql> SELECT HOSTNAME, MESSAGE FROM SYSLOG;
proxmox01.moffatt.me | I ❤ logs
proxmox01.moffatt.me | I still ❤ logs
from where KSQL populates in real time a derived stream, in this case with only suspicious login attempts present:
ksql> CREATE STREAM SYSLOG_INVALID_USERS AS \
SELECT * \
FROM SYSLOG \
WHERE HOST='rpi-03' AND MESSAGE LIKE '%Invalid user%';
ksql> SELECT * FROM SYSLOG_INVALID_USERS LIMIT 1; 1520176464386 | //192.168.10.105:38254 | 1520176464000 | 4 | rpi-03 | 6 | rpi-03 sshd[24150]: Invalid user mini from 192.168.42.42 | UTF-8 | /192.168.10.105:38254 | rpi-03.moffatt.me LIMIT reached for the partition. Query terminated
To get started you need to have a Slack workspace (create one for free if necessary), and your API keys. I’m taking a lot of the boilerplate code below from this article on using Slack’s Python client, and this article on using the Confluent Kafka Python client. I make no claims to be a good or stylish Python coder, but this is a prototype, so ¯\_(ツ)_/¯ #TechnicalDebtFTW 😉
First up, let’s install the Slack library:
pip install slackclient
Now run this Python code to check that your API keys are working. You can find more details and examples of using the API from Python in the documentation.
You should see a message popup in the #general channel:
Now let’s hook this up to Kafka!
Whilst Apache Kafka ships with client libraries for Java and Scala, Confluent adds support for other languages, including Python, C/C++, .NET and Go, as part of the Confluent Platform. To install the Confluent Kafka Python client run:
pip install confluent-kafka
Let’s run some very simple Python code to check that things work:
At this point, nothing will happen—because the Python script is waiting for a message on the TEST topic. Let’s give it one! We’re going to use the kafka-console-producer utility to do this:
$ kafka-console-producer \
--broker-list localhost:9092 \
--topic TEST
Nothing will happen, as it’s now waiting for input on stdin. Enter a test message:
{"f1": "value1"}
And you should see in the output from the Python script:
{u'f1': u'value1'}
So with both pieces of the puzzle proven and tested, let’s combine them. This simple script listens on a Kafka topic called KSQL_NOTIFY, and uses the message it receives to send a Slack notification. The target channel (or user), are taken from the Kafka message itself, as is the text to send.
Test it out as before, with the kafka-console-producer:
$ kafka-console-producer \
--broker-list localhost:9092 \
--topic KSQL_NOTIFY
Enter a message on stdin:
{"TEXT": "Hi! This is a push message with ❤ from Kafka and Slack", "CHANNEL": "general"}
And you should then get a notification on Slack, on whichever device you chose, including iOS and Apple Watch 🙂
Now that we have our push notification system working, we just need to stream the desired messages to the Kafka topic. We’ve already got a derived topic which is populated in realtime with any syslog events matching our filter criteria (“Invalid user”, usually indicating an unauthorised login attempt):
ksql> DESCRIBE EXTENDED SYSLOG_INVALID_USERS;
Type : STREAM Key field : Timestamp field : Not set - using <ROWTIME> Key format : STRING Value format : AVRO Kafka output topic : SYSLOG_INVALID_USERS (partitions: 4, replication: 1)
Field | Type -------------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) DATE | BIGINT FACILITY | INTEGER HOST | VARCHAR(STRING) LEVEL | INTEGER MESSAGE | VARCHAR(STRING) CHARSET | VARCHAR(STRING) REMOTE_ADDRESS | VARCHAR(STRING) HOSTNAME | VARCHAR(STRING) --------------------------------------------
Queries that write into this STREAM ----------------------------------- id:CSAS_SYSLOG_INVALID_USERS - CREATE STREAM SYSLOG_INVALID_USERS AS SELECT * FROM SYSLOG WHERE HOST='rpi-03' AND MESSAGE LIKE '%Invalid user%';
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics ------------------------ messages-per-sec: 0 total-messages: 2700 last-message: 3/23/18 10:53:18 AM GMT failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a (Statistics of the local KSQL server interaction with the Kafka topic SYSLOG_INVALID_USERS) ksql>
Now we can further refine this stream in order to trigger our Slack push notifications, populating the topic our Python script is subscribed to, KSQL_NOTIFY
CREATE STREAM KSQL_NOTIFY WITH (VALUE_FORMAT='JSON') AS \
SELECT TIMESTAMPTOSTRING(DATE, 'yyyy-MM-dd HH:mm:ss') + ' 👽 ' + MESSAGE + ' 👽' AS TEXT, \
'general' AS CHANNEL \
FROM SYSLOG_INVALID_USERS;
You may get this error from KSQL:
io.confluent.ksql.exception.KafkaTopicException: Topic 'KSQL_NOTIFY' does not conform to the requirements Partitions:1 v 4. Replication: 1 v 1
What this means is that the KSQL_NOTIFY topic already exists (because we created it implicitly above), and doesn’t match the partition and/or replication factor that KSQL expects. To workaround this, you can specify overrides in the WITH clause. For example, to specify that the topic is to have 1 partition and 1 replica, you’d use:
ksql> CREATE STREAM KSQL_NOTIFY WITH (VALUE_FORMAT='JSON', PARTITIONS=1) AS \
[...]
Depending on how voracious the bots are that are scanning my servers, this trickle (or…stream) of notifications could turn into somewhat of a deluge, which is not really so useful. After all, the whole point of notifications is for someone to take action on someone, or at least be aware of an abnormal condition. So what we can do here is use KSQL’s aggregation capabilities to refine our alerting into a basic form of anomaly detection. Before we do this, let’s terminate the streaming query that’s populating KSQL_NOTIFY:
DROP STREAM KSQL_NOTIFY;
Taking the SYSLOG_INVALID_USERS stream that we created above as our starting point, we can look at how many attempted logins we’re getting per minute:
ksql> SELECT HOST,COUNT(*) FROM SYSLOG_INVALID_USERS WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY HOST;
rpi-03 | 1
rpi-03 | 2
rpi-03 | 4
To see which window this refers to we’ll persist the aggregate query as a table:
ksql> CREATE TABLE INVALID_USERS_LOGINS_PER_HOST AS \
SELECT HOST,COUNT(*) AS INVALID_LOGIN_COUNT \
FROM