Streaming ETL

Processing Syslog Data: Filtering

KSQL can enrich and filter syslog data to reveal particular conditions or events. syslog is a standard technology that applications can use to send log messages to local files or remote servers. With KSQL, you can filter and react to events in real time rather than performing historical analysis of syslog data from cold storage.

Because data flows in from numerous servers and network devices, the volume of messages can be large. There may be a particular condition or event that you’re looking to track. In this example, KSQL is used to detect attempted logins through SSH, which may indicate a malicious attack.

Environment 4.1 or higher

Directions

The source event stream is named syslog and looks like this:

{
  "date": {
    "long": 1525950906000
  },
  "facility": {
    "int": 0
  },
  "host": {
    "string": "proxmox01"
  },
  "level": {
    "int": 5
  },
  "message": {
    "string": "proxmox01 kernel: [36025328.043913] audit: type=1400 audit(1525950906.295:7747987): apparmor=\"DENIED\" operation=\"ptrace\" profile=\"docker-default\" pid=26854 comm=\"node\" requested_mask=\"trace\" denied_mask=\"trace\" peer=\"docker-default\""
  },
  "charset": {
    "string": "UTF-8"
  },
  "remote_address": {
    "string": "/192.168.10.250:45767"
  },
  "hostname": {
    "string": "proxmox01.moffatt.me"
  }
}

1. In KSQL, register the SYSLOG stream:

ksql> CREATE STREAM SYSLOG WITH (KAFKA_TOPIC='syslog',VALUE_FORMAT='AVRO');

 Message
----------------
 Stream created
----------------

2. Test and refine the filter predicate looking for invalid login attempts, using the WHERE clause on the message column:

ksql> SELECT * FROM SYSLOG WHERE MESSAGE LIKE '%Invalid user%';
1520176464386 | /*/192.168.10.105:38254 | 1520176464000 | 4 | rpi-03 | 6 | rpi-03 sshd[24150]: Invalid user mini from 114.130.4.16 | UTF-8 | /192.168.10.105:38254 | rpi-03.moffatt.me
1520176909996 | /*/192.168.10.105:38254 | 1520176909000 | 4 | rpi-03 | 6 | rpi-03 sshd[24200]: Invalid user admin from 103.99.0.209 | UTF-8 | /192.168.10.105:38254 | rpi-03.moffatt.me

3. Persist the continuous results of the streaming query as a new Kafka topic:

CREATE STREAM FILTERED_SYSLOG WITH (KAFKA_TOPIC='syslog_invalid_ssh_user', VALUE_FORMAT='JSON') AS \
SELECT * FROM SYSLOG WHERE MESSAGE LIKE '%Invalid user%';

The WITH clause is optional. If you omit it, the KAFKA_TOPIC name will default to the name of the stream and VALUE_FORMAT will default to the value of the input stream.

4. The syslog_invalid_ssh_user topic contains the failed SSH login attempts. You can inspect the resulting Kafka topics using KSQL:

ksql> SELECT * FROM FILTERED_SYSLOG LIMIT 1;
1520176464386 | /*/192.168.10.105:38254 | 1520176464000 | 4 | rpi-03 | 6 | rpi-03 sshd[24150]: Invalid user mini from 114.130.4.16 | UTF-8 | /192.168.10.105:38254 | rpi-03.moffatt.me
LIMIT reached for the partition.
Query terminated

And you can inspect using Kafka client tools, such as kafkacat:

$ kafkacat -C -b localhost:9092 -t syslog_invalid_ssh_user|jq '.'
{
  "REMOTE_ADDRESS": "/192.168.10.105:38254",
  "DATE": 1521819619000,
  "MESSAGE": "rpi-03 sshd[11637]: Invalid user test9 from 203.154.117.52",
  "CHARSET": "UTF-8",
  "HOSTNAME": "rpi-03.moffatt.me",
  "FACILITY": 4,
  "HOST": "rpi-03",
  "LEVEL": 6
}
< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.