anomaly detection

Processing Syslog Data: Pattern Detection and Alerting

KSQL can enrich and filter syslog data to reveal particular conditions or events. syslog is a standard technology that applications can use to send log messages to local files or remote servers. With KSQL, you can filter and react to events in real time rather than performing historical analysis of syslog data from cold storage.

Because data flows in from numerous servers and network devices, the volume of messages can be large. In this example, KSQL is used to detect and alert for patterns in the data.

Environment 4.1 or higher

Directions

Confluent Platform must be installed before using this recipe. KSQL is a component of Confluent Platform and the KSQL binaries are a part of the Confluent Platform bundle.

In this example, the source event stream is named syslog.

1. In KSQL, register the SYSLOG stream:

ksql> CREATE STREAM SYSLOG WITH \
      (KAFKA_TOPIC='syslog',VALUE_FORMAT='AVRO');


Message

----------------

Stream created

----------------

2. Inspect the available columns:

ksql> DESCRIBE SYSLOG;



Field | Type

--------------------------------------------

ROWTIME | BIGINT (system)

ROWKEY | VARCHAR(STRING) (system)

DATE | BIGINT

FACILITY | INTEGER

HOST | VARCHAR(STRING)

LEVEL | INTEGER

MESSAGE | VARCHAR(STRING)

CHARSET | VARCHAR(STRING)

REMOTE_ADDRESS | VARCHAR(STRING)

HOSTNAME | VARCHAR(STRING)

--------------------------------------------


3. The LEVEL column defines the severity of the syslog event message, ranging from 7 (debug) to 0 (emergency). The following statement will detect events that have the following characteristics:

  • Error status or more serious (LEVEL⇐3)
  • 

More than 10 events of this type (HAVING COUNT(*)>10)
  • 

Occurring within a one-minute time window (WINDOW TUMBLING (SIZE 1 MINUTE))
  • On a given host (GROUP BY HOST)
ksql> SELECT HOST, COUNT(*) AS ERROR_COUNT \

FROM SYSLOG WINDOW TUMBLING (SIZE 1 MINUTE) \

WHERE LEVEL<=3 \
 GROUP BY HOST \
HAVING COUNT(*)>10;

asgard02 | 14

asgard02 | 12

proxmox01 | 42

Note that the time of the aggregation window is not shown in the output. You can access this after the query is persisted as a table below.

4. Persist the continuous results of the streaming query as a new Kafka topic:

CREATE TABLE hosts_with_error_sla_breach AS \

SELECT HOST, COUNT(*) AS ERROR_COUNT \

FROM SYSLOG WINDOW TUMBLING (SIZE 1 MINUTE) \

WHERE LEVEL<=3 \
GROUP BY HOST \ 
HAVING COUNT(*)>10;

Note that this is declared as a TABLE because it holds the result of an aggregation (COUNT), the output of which is always a table.

5. Inspect the resulting Kafka topic through KSQL, and see the time window of each aggregation using the ROWTIME system column from the table:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), HOST, ERROR_COUNT 
FROM hosts_with_error_sla_breach;

2018-03-04 13:52:00 | asgard02 | 12

2018-03-04 13:53:00 | asgard02 | 12

2018-04-27 06:25:00 | proxmox01 | 14


You can use the Kafka topic (hosts_with_error_sla_breach) that is populated by this KSQL application to drive direct alerts through a service that subscribes to the topic. You can also use it to power operational dashboards by landing the data to a store such as Elasticsearch using Kafka Connect.

< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.