Networks

Detecting and Analyzing Suspicious Network Activity

When malware attacks a network, detecting it early is essential to preventing its spread and damage caused. By processing network device logs in real time with KSQL, it is possible to detect the kind of unusual activity that may indicate a malware infection.

Directions

The source event stream is from a router and shows traffic source and destination details.

{
  "event_ts": "1526078852000",
  "source_ip": "214.85.228.173",
  "dest_ip": "34.22.126.18",
  "source_prt": 44423,
  "dest_prt": 23
}

1. In KSQL, register the FIREWALL_EVENTS stream.

ksql> CREATE STREAM FIREWALL_EVENTS WITH (KAFKA_TOPIC='FW_SRC5', VALUE_FORMAT='AVRO', TIMESTAMP='event_ts');

 Message
----------------
 Stream created
----------------

Note that the event time is used in the stream definition, so that the time-based processing is done based on when the event occurred, not when it was written to the system. This also enables KSQL to handle late-arriving data automatically.

2. Inspect the event data, here looking at the number of events per destination network port per one-minute window.

ksql> SELECT DEST_PRT, COUNT(*) FROM FIREWALL_EVENTS WINDOW TUMBLING (SIZE 1 MINUTES) GROUP BY DEST_PRT;
443 | 58271
80  | 7163
22  | 10

3. Write to a Kafka topic when any port—other than standard expected ones for HTTP/TLS traffic—sees more traffic within a minute than would be expected (e.g., > 100):

ksql> CREATE TABLE SUSPICIOUS_NET_ACTIVITY AS 
      SELECT DEST_PRT, COUNT(*) AS EVENT_COUNT 
      FROM FIREWALL_EVENTS 
      WINDOW TUMBLING (SIZE 1 MINUTES) 
      WHERE DEST_PRT !=80 
        AND DEST_PRT !=443 
      GROUP BY DEST_PRT 
      HAVING COUNT(*)>100;

 Message
---------------------------
 Table created and running
---------------------------

The resulting Kafka topic (SUSPICIOUS_NET_ACTIVITY) can be used to trigger alerts directly, stream into a real-time dashboard view or serve as the basis for further analysis.

Here’s an example where 149 connections were made to port 420 within a period of one minute:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), DEST_PRT, EVENT_COUNT FROM SUSPICIOUS_NET_ACTIVITY;
2018-05-11 09:30:00 | 339 | 298

4. Using the same criteria as above—over 100 connections to non-standard ports within a one-minute window—we can create a second output topic that captures the source IP of the traffic, too, so as to help identify the compromised machine(s):

ksql> CREATE TABLE SUSPICIOUS_HOST_ACTIVITY AS 
  SELECT SOURCE_IP, DEST_PRT, COUNT(*) AS EVENT_COUNT 
  FROM FIREWALL_EVENTS 
  WINDOW TUMBLING (SIZE 1 MINUTES) 
  WHERE DEST_PRT !=80 
    AND DEST_PRT !=443 
  GROUP BY SOURCE_IP, DEST_PRT 
  HAVING COUNT(*)>100;

Message
---------------------------
Table created and running
---------------------------

In this example, there are three hosts identified, each sending numerous connections to port 339 on target machines:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), SOURCE_IP, DEST_PRT, EVENT_COUNT FROM SUSPICIOUS_HOST_ACTIVITY;
2018-05-11 09:30:00 | 49.2.224.203 | 339 | 298
2018-05-11 09:31:00 | 241.212.158.23 | 339 | 143
2018-05-11 09:32:00 | 8.115.29.34 | 339 | 201

5. Apache Kafka® persists data. Therefore, even events that have already been received can be analyzed.

Taking the example of the suspicious host activity above, we can use predicates in KSQL to not only examine past events but also new events as they arrive that match the specified conditions:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), 
      SOURCE_IP, DEST_IP, SOURCE_PRT, DEST_PRT 
      FROM FIREWALL_EVENTS 
      WHERE DEST_PRT=339 
        AND SOURCE_IP='49.2.224.203';

2018-05-11 09:30:00 | 49.2.224.203 | 62.215.118.49 | 1255 | 339
2018-05-11 09:30:01 | 49.2.224.203 | 103.34.25.18 | 769 | 339
2018-05-11 09:30:01 | 49.2.224.203 | 228.1.115.206 | 9 | 339
2018-05-11 09:30:01 | 49.2.224.203 | 35.95.206.140 | 471 | 339
2018-05-11 09:30:05 | 49.2.224.203 | 212.251.81.193 | 872 | 339
2018-05-11 09:30:05 | 49.2.224.203 | 27.50.117.85 | 551 | 339
2018-05-11 09:30:06 | 49.2.224.203 | 187.138.232.240 | 1884 | 339
2018-05-11 09:30:08 | 49.2.224.203 | 13.120.9.221 | 1160 | 339
2018-05-11 09:30:09 | 49.2.224.203 | 80.142.1.209 | 893 | 339
2018-05-11 09:30:08 | 49.2.224.203 | 207.34.18.160 | 439 | 339

KSQL is a streaming query language, and thus the above query will run continually, showing all new events that match the condition as they arrive, until the query is cancelled. You can also use the LIMIT clause to only show a specified number of messages.

< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.