Directions
The source event stream is named syslog
and looks like this:
{ "date": { "long": 1525950906000 }, "facility": { "int": 0 }, "host": { "string": "proxmox01" }, "level": { "int": 5 }, "message": { "string": "proxmox01 kernel: [36025328.043913] audit: type=1400 audit(1525950906.295:7747987): apparmor=\"DENIED\" operation=\"ptrace\" profile=\"docker-default\" pid=26854 comm=\"node\" requested_mask=\"trace\" denied_mask=\"trace\" peer=\"docker-default\"" }, "charset": { "string": "UTF-8" }, "remote_address": { "string": "/192.168.10.250:45767" }, "hostname": { "string": "proxmox01.moffatt.me" } }
1. In KSQL, register the SYSLOG
stream:
ksql> CREATE STREAM SYSLOG WITH (KAFKA_TOPIC='syslog',VALUE_FORMAT='AVRO'); Message ---------------- Stream created ----------------
2. Test and refine the filter predicate looking for invalid login attempts, using the WHERE
clause on the message
column:
ksql> SELECT * FROM SYSLOG WHERE MESSAGE LIKE '%Invalid user%'; 1520176464386 | /*/192.168.10.105:38254 | 1520176464000 | 4 | rpi-03 | 6 | rpi-03 sshd[24150]: Invalid user mini from 114.130.4.16 | UTF-8 | /192.168.10.105:38254 | rpi-03.moffatt.me 1520176909996 | /*/192.168.10.105:38254 | 1520176909000 | 4 | rpi-03 | 6 | rpi-03 sshd[24200]: Invalid user admin from 103.99.0.209 | UTF-8 | /192.168.10.105:38254 | rpi-03.moffatt.me
3. Persist the continuous results of the streaming query as a new Kafka topic:
CREATE STREAM FILTERED_SYSLOG WITH (KAFKA_TOPIC='syslog_invalid_ssh_user', VALUE_FORMAT='JSON') AS \ SELECT * FROM SYSLOG WHERE MESSAGE LIKE '%Invalid user%';
The WITH
clause is optional. If you omit it, the KAFKA_TOPIC
name will default to the name of the stream and VALUE_FORMAT
will default to the value of the input stream.
4. The syslog_invalid_ssh_user
topic contains the failed SSH login attempts. You can inspect the resulting Kafka topics using KSQL:
ksql> SELECT * FROM FILTERED_SYSLOG LIMIT 1; 1520176464386 | /*/192.168.10.105:38254 | 1520176464000 | 4 | rpi-03 | 6 | rpi-03 sshd[24150]: Invalid user mini from 114.130.4.16 | UTF-8 | /192.168.10.105:38254 | rpi-03.moffatt.me LIMIT reached for the partition. Query terminated
And you can inspect using Kafka client tools, such as kafkacat:
$ kafkacat -C -b localhost:9092 -t syslog_invalid_ssh_user|jq '.' { "REMOTE_ADDRESS": "/192.168.10.105:38254", "DATE": 1521819619000, "MESSAGE": "rpi-03 sshd[11637]: Invalid user test9 from 203.154.117.52", "CHARSET": "UTF-8", "HOSTNAME": "rpi-03.moffatt.me", "FACILITY": 4, "HOST": "rpi-03", "LEVEL": 6 }