[Webinar] Don’t Get Left Behind: Unlock the Secrets of Shifting Left | Register Now

Learning All About Wi-Fi Data with Apache Kafka and Friends

Written By

Recently, I’ve been looking at what’s possible with streams of Wi-Fi packet capture (pcap) data. I was prompted after initially setting up my Raspberry Pi to capture pcap data and stream it to Apache Kafka®. Because I was using Confluent Cloud, it was easy enough to chuck the data at the Kafka cluster and not worry about where or how to run it. I set this running a month ago, and with a few blips in between (my Raspberry Pi is not deployed for any nines of availability!), I now have a bunch of raw pcap data that I thought would be interesting to dig into using Kafka and its surrounding ecosystem.

In a nutshell, what I end up with is a pipeline for processing the pcap data, joining it to reference data from an external system, aggregating it, and streaming it to various datastores, including Neo4j and PostgreSQL.

Wi-Fi packet data | Device reference data ➝ Kafka | ksqlDB ➝ Postgres | Elasticsearch, Kibana | Neo4j

At the heart of the data is wlan pcap data captured from tshark (the command line equivalent of Wireshark). The data is stored in JSON on a partitioned Kafka topic, and a sample message looks like this:

{
  "timestamp": "1584802202655",
  "wlan_fc_type": [ "1" ],
  "wlan_fc_type_subtype": [ "25" ],
  "wlan_radio_channel": [ "1" ],
  "wlan_radio_signal_dbm": [ "-73" ],
  "wlan_radio_duration": [ "32" ],
  "wlan_ra": [ "a4:fc:77:6c:55:0d" ],
  "wlan_ra_resolved": [ "MegaWell_6c:55:0d" ],
  "wlan_ta": [ "48:d3:43:43:cd:d1" ],
  "wlan_ta_resolved": [ "ArrisGro_43:cd:d1" ]
}

These messages come in at a rate of about 100,000 per hour, or c.30 per second. That’s not “big data,” but it’s not insignificant either.

It certainly gives me a bunch of data that’s more than I can grok by just poking around it. I don’t really know what I’m looking for in the data quite yet, I’m just interested in what I’ve scooped up in my virtual net. Let’s take two approaches: one visual and one numeric to see if we can get a better handle on the data.

Understanding the overall dataset visually

First, I wanted a visual glimpse of the data to better understand usage volumes. Kibana has a nice tool as part of its machine learning feature called Data Visualizer. Let’s stream our raw Wi-Fi packet captures into Elasticsearch and check it out.

Wi-Fi packet data ➝ Kafka ➝ Kafka Connect ➝ Elasticsearch + Kibana

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/sink-elastic-pcap-00/config \
    -d '{
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "topics": "pcap",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "connection.url": "http://elasticsearch:9200",
        "type.name": "_doc",
        "key.ignore": "true",
        "schema.ignore": "true"
    }'

Here, we can see characteristics in the data including:

  • Record count per day
    # document count
  • The distribution of MAC addresses in senders (sa) and receivers (ra)
    Receivers (ra) | Senders (sa)
  • Which types of packets are most commonly found (wlan_fc_subtype)
    wlan_fc_subtype

Based on this published table, we can see the most common types of packets are:

  • 33% ACK (wlan.fc.type_subtype == 29)
  • 20% Request to Send (wlan.fc.type_subtype == 27)
  • 13% Probe Request (wlan.fc.type_subtype == 4)
  • 11% NULL Data (wlan.fc.type_subtype == 36)

What about getting these numbers from analysing the data directly from the console, without using Elasticsearch?

Understanding the overall dataset numerically

With ksqlDB, you can use SQL queries to inspect, aggregate, and process data. Let’s see it in action with the raw pcap data that we’ve got.

Wi-Fi packet data ➝ Kafka | ksqlDB ➝ `SELECT SUBTYPE, \ COUNT (*) \ FROM PCAP \ GROUP BY SUBTYPE

First up, I’m going to declare a schema so that I can query attributes of the data in the topic. The full schema is pretty beefy and since I’m only interested in a couple of fields at this point (the date and the subtype ranges), I’m just going to specify a partial schema at this stage.

CREATE STREAM PCAP_RAW_00 (timestamp            BIGINT,
                           wlan_fc_type_subtype ARRAY<INT>)
    WITH (KAFKA_TOPIC='pcap',
          VALUE_FORMAT='JSON',
          TIMESTAMP='timestamp');

A couple of things to point out:

  • Notice that the wlan_fc_type_subtype is written by the source application (tshark) as an array of INT, e.g., "wlan_fc_type": [ "1" ]
  • I tell ksqlDB to take the timestamp field from the payload as the timestamp of the message (WITH … TIMESTAMP='timestamp')If I didn’t do this, it would default to the timestamp of the Kafka message held in the message’s metadata, which is not what I want in this case. Since we’re going to analyse the time attributes of the data, it’s important to correctly handle event time vs. ingest time here.

With the stream created, I tell ksqlDB to process the data from the beginning of the topic:

SET 'auto.offset.reset' = 'earliest';

And then query it using SQL:

SELECT wlan_fc_type_subtype[1] AS SUBTYPE,
       COUNT(*) AS PACKET_COUNT,
       TIMESTAMPTOSTRING(MIN(ROWTIME),'yyyy-MM-dd HH:mm:ss','GMT') AS MIN_TS,
       TIMESTAMPTOSTRING(MAX(ROWTIME),'yyyy-MM-dd HH:mm:ss','GMT') AS MAX_TS
FROM   PCAP_RAW_00 WHERE ROWTIME < (UNIX_TIMESTAMP() - 1800000)
GROUP BY wlan_fc_type_subtype[1]
EMIT CHANGES;

Here, we’re aggregating all messages received up until the last half hour by subtype, but you’ll notice that it’s calculating the numbers from the start of the topic and continually updating as newer messages are processed:

Aggregating messages

To build on this example, let’s break down the subtype count by day. You’ll notice that instead of writing the aggregate result to the screen, we’re instantiating it as a table within ksqlDB:

SET 'auto.offset.reset' = 'earliest';
CREATE TABLE PCAP_STATS WITH (VALUE_FORMAT='AVRO') AS
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd','GMT') AS WINDOW_DAY,
       WLAN_FC_TYPE_SUBTYPE[1] AS SUBTYPE,
       COUNT(*) AS PACKET_COUNT,
       TIMESTAMPTOSTRING(MIN(ROWTIME),'HH:mm:ss','GMT') AS EARLIEST_TIME,
       TIMESTAMPTOSTRING(MAX(ROWTIME),'HH:mm:ss','GMT') AS LATEST_TIME
FROM   PCAP_RAW_00
        WINDOW TUMBLING (SIZE 1 DAY)
GROUP BY WLAN_FC_TYPE_SUBTYPE[1]
EMIT CHANGES;

Now there’s an actual materialised view of this data, backed by a persisted Kafka topic:

ksql> SHOW TABLES;
Table Name | Kafka Topic | Format | Windowed ---------------------------------------------- PCAP_STATS | PCAP_STATS | AVRO | false ----------------------------------------------

We can query this table using either a push query, showing all updates as they arrive:

Push Query

Or we can query the value directly with a pull query for any of the subtypes:

ksql> SELECT WINDOW_DAY, SUBTYPE, PACKET_COUNT, EARLIEST_TIME, LATEST_TIME
        FROM PCAP_STATS
       WHERE ROWKEY = 4 ;
+-----------+--------+-------------+--------------+------------+
|WINDOW_DAY |SUBTYPE |PACKET_COUNT |EARLIEST_TIME |LATEST_TIME |
+-----------+--------+-------------+--------------+------------+
|2020-02-28 |4       |84           |22:47:06      |23:59:35    |
|2020-02-29 |4       |3934         |00:02:01      |23:58:19    |
|2020-03-01 |4       |1601         |00:00:06      |23:58:07    |
|2020-03-02 |4       |1125         |00:00:12      |23:59:13    |
Query terminated

Since it’s just a Kafka topic, you can persist this aggregate to a database, using the message key to ensure that values update in place. To do this, we’ll use Kafka Connect like we did above for Elasticsearch, but instead of calling the Kafka Connect REST API natively, here we’ll use ksqlDB as the interface for creating the connector:

CREATE SINK CONNECTOR SINK_POSTGRES_PCAP_STATS_00 WITH (
    'connector.class'     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'     = 'postgres',
    'connection.password' = 'postgres',
    'topics'              = 'PCAP_STATS',
    'key.converter'       = 'org.apache.kafka.connect.storage.StringConverter',
    'auto.create'         = 'true',
    'auto.evolve'         = 'true',
    'insert.mode'         = 'upsert',
    'pk.mode'             = 'record_value',
    'pk.fields'           = 'WINDOW_DAY,SUBTYPE',
    'table.name.format'   = '${topic}'
);

Now as each message arrives in the source Kafka topic, it’s incorporated in the aggregation by ksqlDB, and the resulting change to the aggregate is pushed to Postgres, where each key (which is a composite of the SUBTYPE plus the day) is updated in place:

postgres=# SELECT * FROM "PCAP_STATS" WHERE "SUBTYPE"=4 ORDER BY "WINDOW_DAY" ;
 WINDOW_DAY | SUBTYPE | PACKET_COUNT | EARLIEST_TIME | LATEST_TIME
------------+---------+--------------+---------------+-------------
 2020-02-28 |       4 |           89 | 22:47:25      | 23:55:44
 2020-02-29 |       4 |         4148 | 00:02:01      | 23:58:19
 2020-03-01 |       4 |         1844 | 00:00:24      | 23:56:53
 2020-03-02 |       4 |          847 | 00:00:12      | 23:59:13
…

Analysing relationships in the data with Kibana

Similar to the Data Visualizer in Kibana, the graph capabilities are also helpful for exploring high-level relationships in the data.

Data graph

By looking at a few of the key entities, we can observe which devices (yellow circles) scan which access points (light blue circles), as well as how frequently (thickness of connecting lines). We can also see the clustering that occurs between devices and the access points that they scan. This pattern of analysis is evidently a fruitful avenue for investigation, and later in this article, we’ll do some dedicated graph modelling and analysis of the data by streaming the data from Kafka into Neo4j and analysing it there.

Splitting the pcap data into separate topics

Having identified the types of data within the pcap stream, let’s now use ksqlDB to split the data into separate topics for further analysis. The separate topics will make the analysis easier to target on the correct stream of data and also allow each type of data to have a different schema.

Wi-Fi packet data ➝ Kafka | ksqlDB ➝ Separate topics

To begin, I’ll declare a schema that covers fields across all types of packet:

CREATE STREAM pcap_raw (timestamp                    BIGINT,
                        wlan_fc_type_subtype         ARRAY<INT>,
                        wlan_radio_channel           ARRAY<INT>,
                        wlan_radio_signal_percentage ARRAY<VARCHAR>,
                        wlan_radio_signal_dbm        ARRAY<DOUBLE>,
                        wlan_radio_duration          ARRAY<INT>,
                        wlan_ra                      ARRAY<VARCHAR>,
                        wlan_ra_resolved             ARRAY<VARCHAR>,
                        wlan_da                      ARRAY<VARCHAR>,
                        wlan_da_resolved             ARRAY<VARCHAR>,
                        wlan_ta                      ARRAY<VARCHAR>,
                        wlan_ta_resolved             ARRAY<VARCHAR>,
                        wlan_sa                      ARRAY<VARCHAR>,
                        wlan_sa_resolved             ARRAY<VARCHAR>,
                        wlan_staa                    ARRAY<VARCHAR>,
                        wlan_staa_resolved           ARRAY<VARCHAR>,
                        wlan_tagged_all              ARRAY<VARCHAR>,
                        wlan_tag_vendor_data         ARRAY<VARCHAR>,
                        wlan_tag_vendor_oui_type     ARRAY<VARCHAR>,
                        wlan_tag_oui                 ARRAY<VARCHAR>,
                        wlan_country_info_code       ARRAY<VARCHAR>,
                        wps_device_name              ARRAY<VARCHAR>,
                        wlan_ssid                    ARRAY<VARCHAR>)
    WITH (KAFKA_TOPIC='pcap',
        VALUE_FORMAT='JSON',
        TIMESTAMP='timestamp');

Now we can pull out records of different types into new streams and take the opportunity to serialise the resulting data to Apache Avro™. Using Avro (or similar serialisation options that include strong support for schemas, like Protobuf) is helpful because we’ve already declared the schema once for the data, and serialising it to Avro means that when we—or anyone else—consumes the data from the topic, the schema is available without having to reenter it. It also provides compatibility checks on the validity of the schema when writing to the topic.

SET 'auto.offset.reset' = 'earliest';
CREATE STREAM PCAP_PROBE  WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM PCAP_RAW WHERE WLAN_FC_TYPE_SUBTYPE[1]=4;
CREATE STREAM PCAP_BEACON WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM PCAP_RAW WHERE WLAN_FC_TYPE_SUBTYPE[1]=8;
CREATE STREAM PCAP_RTS    WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM PCAP_RAW WHERE WLAN_FC_TYPE_SUBTYPE[1]=27;
CREATE STREAM PCAP_CTS    WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM PCAP_RAW WHERE WLAN_FC_TYPE_SUBTYPE[1]=28;
CREATE STREAM PCAP_ACK    WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM PCAP_RAW WHERE WLAN_FC_TYPE_SUBTYPE[1]=29;
CREATE STREAM PCAP_NULL   WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM PCAP_RAW WHERE WLAN_FC_TYPE_SUBTYPE[1]=36;

If we had a partition key strategy that we wanted to apply, we could do this by specifying PARTITION BY—but since we’re still at the early stages of analysis, we’ll leave the key unset for now (which means that messages will be distributed “round robin” evenly across all partitions). We could also opt to drop unused columns from the schema for particular message types by replacing SELECT * with a specific projection of required columns.

This creates and populates new Kafka topics:

ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas -------------------------------------------------------------------------- PCAP_ACK | 12 | 3 PCAP_BEACON | 12 | 3 PCAP_CTS | 12 | 3 PCAP_NULL | 12 | 3 PCAP_PROBE | 12 | 3 PCAP_RTS | 12 | 3 PCAP_STATS | 12 | 3 …

Analysing Wi-Fi probe requests

Mobile devices send probe requests to see what access points (AP) are available, which provides a good source of data for analysis. I was curious how often my Wi-Fi network is probed by both familiar and unfamiliar devices. Kibana is a great tool here for “slicing and dicing” the data to explore that question. By adding a filter for the subtype, we can easily pick out the fields that have relevant data:

Kibana: 6,662 hits

  • wlan_sa is the raw source MAC address, whilst wlan_sa_resolved includes, in some cases, the manufacturer’s prefix
  • Most requests look for any AP, but some are for a specific wireless network (wlan_ssid)

Enriching pcap data with lookups

So in all this “digital exhaust” is a load of devices from within my house, plus others externally. Wouldn’t it be nice to be able to identify them? This is where the real power of ksqlDB comes in, because I can use it to join a stream of events (pcap data) with lookup data from elsewhere.

Wi-Fi packet data | Device reference data ➝ Kafka | ksqlDB ➝ Join stream

My Ubiquiti router uses MongoDB to store details of all my household devices that use it across two separate MongoDB collections. Let’s pull that data into Kafka through ksqlDB:

CREATE SOURCE CONNECTOR SOURCE_MONGODB_UNIFI_01 WITH (
    'connector.class' = 'io.debezium.connector.mongodb.MongoDbConnector',
    'mongodb.hosts' = 'rs0/mongodb:27017',
    'mongodb.name' = 'unifi',
    'collection.whitelist' = 'ace.device, ace.user'
);

With this data in Kafka, we can use some data wrangling tricks to build two ksqlDB tables of devices (switches, APs, and routers) and users (i.e., Wi-Fi clients—mobiles, laptops, etc.). To get more background on what this ksqlDB code does and why, check out this article.

-- Model source topics
CREATE STREAM DEVICES_RAW WITH (KAFKA_TOPIC='unifi.ace.device', VALUE_FORMAT='AVRO');
CREATE STREAM USERS_RAW   WITH (KAFKA_TOPIC='unifi.ace.user',   VALUE_FORMAT='AVRO');
-- Extract device data fields from JSON payload SET 'auto.offset.reset' = 'earliest'; CREATE STREAM ALL_DEVICES WITH (PARTITIONS=12, KAFKA_TOPIC='all_devices_00') AS SELECT 'ace.device' AS SOURCE, EXTRACTJSONFIELD(AFTER ,'$.mac') AS MAC, EXTRACTJSONFIELD(AFTER ,'$.ip') AS IP, EXTRACTJSONFIELD(AFTER ,'$.name') AS NAME, EXTRACTJSONFIELD(AFTER ,'$.model') AS MODEL, EXTRACTJSONFIELD(AFTER ,'$.type') AS TYPE, CAST('0' AS BOOLEAN) AS IS_GUEST FROM DEVICES_RAW -- Set the MAC address as a the message key PARTITION BY EXTRACTJSONFIELD(AFTER ,'$.mac') EMIT CHANGES;
-- Extract user (client device) data from JSON payload with some -- wrangling to handle null/empty fields etc. -- Note that this is an "INSERT INTO" and thus in effect is a UNION of -- the two source topics with some wrangling to align the schemas. SET 'auto.offset.reset' = 'earliest'; INSERT INTO ALL_DEVICES SELECT 'ace.user' AS SOURCE, EXTRACTJSONFIELD(AFTER ,'$.mac') AS MAC, '' AS IP, -- Use a CASE statement to build a single label per device -- based on whether we have a name and/or hostname, and -- whether the device is a guest or not. CASE WHEN EXTRACTJSONFIELD(AFTER ,'$.name') IS NULL THEN CASE WHEN EXTRACTJSONFIELD(AFTER ,'$.hostname') IS NULL THEN CASE WHEN CAST(EXTRACTJSONFIELD(AFTER ,'$.is_guest') AS BOOLEAN) THEN 'guest_' ELSE 'nonguest_' END + EXTRACTJSONFIELD(AFTER ,'$.oui') ELSE EXTRACTJSONFIELD(AFTER ,'$.hostname') END ELSE CASE WHEN EXTRACTJSONFIELD(AFTER ,'$.hostname') IS NULL THEN EXTRACTJSONFIELD(AFTER ,'$.name') ELSE EXTRACTJSONFIELD(AFTER ,'$.name') + ' (' + EXTRACTJSONFIELD(AFTER ,'$.hostname') + ')' END END AS NAME, EXTRACTJSONFIELD(AFTER ,'$.oui') AS MODEL, '' AS TYPE, CAST(EXTRACTJSONFIELD(AFTER ,'$.is_guest') AS BOOLEAN) AS IS_GUEST FROM USERS_RAW -- Ignore Ubiquiti devices because these are picked up -- from the `unifi.ace.device` data WHERE EXTRACTJSONFIELD(AFTER ,'$.oui')!='Ubiquiti' -- Set the MAC address as a the message key PARTITION BY EXTRACTJSONFIELD(AFTER ,'$.mac') EMIT CHANGES;
-- Declare a materialised ksqlDB table over the resulting combined stream SET 'auto.offset.reset' = 'earliest'; CREATE TABLE DEVICES AS SELECT MAC, LATEST_BY_OFFSET(SOURCE) AS SOURCE, LATEST_BY_OFFSET(NAME) AS NAME, LATEST_BY_OFFSET(IS_GUEST)