[Webinar] Don’t Get Left Behind: Unlock the Secrets of Shifting Left | Register Now

Streaming ETL and Analytics on Confluent with Maritime AIS Data

Written By

One of the canonical examples of streaming data is tracking location data over time. Whether it’s ride-sharing vehicles, the position of trains on the rail network, or tracking airplanes waking up your cat, handling the stream of data in real time enables functionality for businesses and their customers in a way that is just not possible in the batch world. Here I’m going to explore another source of streaming data, but away from road and rail—out at sea, with data from ships.

Data visualization flow

Automatic identification system (AIS) data is broadcast by most ships and can be consumed passively by anyone with a receiver.

Data consumption flow

By streaming a feed of AIS data into Apache Kafka®, it’s possible to use it for various purposes, each of which I’m going to explore in more detail.

Ship data

  • Part 1: Analytics and streaming ETL drive both real-time and ad hoc data exploration with a streaming pipeline to cleanse and transform the data.
  • Part 2: Stream processing to identify patterns in the data that could suggest certain behaviour.

Heatmap

AIS data

The AIS data source comes from a public feed published under the Norwegian Licence for Open Government Data (NLOD) distributed by the Norwegian Coastal Administration. It covers:

AIS data from all vessels within a coverage area that includes the Norwegian economic zone and the protection zones off Svalbard and Jan Mayen, but with the exception of fishing vessels under 15 meters and recreational vessels under 45 meters

AIS data streams can contain messages of different types. You can check out this great resource on AIS payload interpretation, which explains all the different types and fields associated with each. For example, message type 1 is a Position Report, but it doesn’t include details about the vessel. For that, you need message type 5 (Static and Voyage Related Data).

Often an AIS source is provided as a feed on a TCP/IP port (as in the case of the one used here). As a raw feed, it’s not much to look at:

$ nc 153.44.253.27 5631
\s:2573485,c:1614772291*0C\!BSVDM,1,1,,A,13maq;7000151TNWKWIA3r<v00SI,0*01
\s:2573250,c:1614772291*02\!BSVDO,1,1,,B,402M3hQvDickN0PTuPRwwH7000S:,0*37
!BSVDM,1,1,,A,13o;a20P@K0LIqRSilCa?W4t0<2<,0*19
\s:2573450,c:1614772291*04\!BSVDM,1,1,,B,13m<?c00000tBT`VuBT1anRt00Rs,0*0D
\s:2573145,c:1614772291*05\!BSVDM,1,1,,B,13m91<001IPPnJlQ9HVJppo00<0;,0*33

Fortunately, GPSd provides gpsdecode, which makes a lot more sense of it:

$ nc 153.44.253.27 5631|gpsdecode |jq --unbuffered '.'
{
  "class": "AIS",
  "device": "stdin",
  "type": 1,
  "repeat": 0,
  "mmsi": 259094000,
  "scaled": true,
  "status": 0,
  "status_text": "Under way using engine",
  "turn": 0,
  "speed": 11.4,
  "accuracy": false,
  "lon": 7.085755,
  "lat": 62.656673,
  "course": 179.1,
  "heading": 186,
  "second": 9,
  "maneuver": 0,
  "raim": false,
  "radio": 98618
}

Analytics

Let’s take a look at the kind of analytics we can easily create from this data, before then taking a step back and walking through how to build it. I’m using Kibana on top of the data held in Elasticsearch with OpenSeaMap tiles added.

Each ship periodically reports information about itself (AIS message type 5), and we can use that to look at the types of ships:

Types of ships

If we filter this just for passenger ships, we can see—as would be expected—fewer reporting in towards the end of the day:

Passenger ship reporting times

We can also look at other properties of the ships, such as their square area. This is calculated from the AIS data in which the ship’s dimensions are reported:

Square area of ships

Using Kibana’s filtering, we can drill down into large ships (>5000 ㎡), which unsurprisingly are mostly cargo and tankers:

Large ships (>5000 ㎡)

This is pretty interesting, but it only looks at the static data that ships report. What about the continuous stream of data that we get from AIS? This tells us where the ships are and also what they’re reporting as doing. If we filter for ships that report as fishing vessels, we shan’t be too surprised to see that around a third of them are Engaged in fishing:

Ship status over time

Part of the AIS payload for a status update is the latitude and longitude points reported by the ship, and we can use this to plot the data on a map. Using Kibana’s heatmap option, we can easily see where the most number of fishing vessels are:

Kibana heatmap

One of the things that I was really interested to see in the version 7.11 release of Kibana was Tracks support in the Map visualisation. By breaking down the data by ship name and callsign, it’s possible to plot the path of each ship:

Paths of individual ships

The plot here is just for fishing vessels (as that’s what we’d filtered on previously), but if we open it up to all ships, but vary the track colour based on the size of the ship, we can see patterns starting to form around shipping routes and the different ships using them:

Track color based on ship size

Heatmap and ship paths

Using the map filtering option, you can draw a region on which to filter the data and examine aggregate information about the ships within it. Here’s everything that’s happening within ~15 km of the city of Bergen, including the associated ship types, activities, and sizes.

All ship activity near Bergen

As well as looking at the data in aggregate, you can drill all the way down. I found it fascinating to look at all the shipping activities and then be able to look at a particular vessel. Starting with the map view, you may spot a track that you’re interested in. Here I’ve seen a larger ship and want to know more about it, so click on the track and then the filter button next to the ship’s name.

Viewing an individual vessel's activity

From that, we can now see what it was doing over time:

Individual ship activity over time

And view individual status reports:

Individual status reports

So that’s what we can do; but let’s take a look now at exactly how. As a streaming ETL data pipeline it is a relatively simple one but with some interesting tricks needed along the way…

Streaming ETL – Extract

I built all this on Confluent Cloud, so first off, I provisioned myself a cluster:

Provisioning a cluster

With API keys in hand, I created a target topic into which to stream the source AIS data:

$ ccloud kafka topic create ais
Created topic "is".

As mentioned above, the raw AIS data can be parsed by gpsdecode to put it into a structured form. From here, I used kafkacat to write it to my Kafka topic. I wrapped this in a Docker container (piggybacking on the existing kafkacat image) to make it self-contained and deployable in the cloud.

$ docker run --rm -t --entrypoint /bin/sh edenhill/kafkacat:1.6.0 -c '
  # Install stuff
$ apk add gpsd gpsd-clients
$ nc 153.44.253.27 5631 | \ gpsdecode | \ kafkacat \ -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \ -X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \ -b BROKER.gcp.confluent.cloud:9092 \ -X sasl.username="API_USER" \ -X sasl.password="API_PASSWORD" \ -t ais -P '
Note:
For the purposes of a proof of concept, this was good enough—were it necessary to build this into something needing a more resilient ingest pipe, you’d probably build a robust service to handle the ingest and parsing of AIS data that used the producer API to stream it to Kafka.


This gave me a stream of data into the ais topic at a rate of around 8 KB/sec. (not really touching the sides of the 100 MB/sec. limit on the lowest-level Confluent Cloud cluster spec).

Stream of data into the ais topic at a rate of around 8 KB/sec

The gpsdecode tool writes the messages out as JSON, which can be inspected with the topic viewer:

Topic viewer

Streaming ETL – Transform

With the data streaming in, next up is taking this single stream of events and transforming it into something easily usable. The tool I used for transforming the stream of data was ksqlDB. This lets me use SQL to describe the stream processing that I want to apply to the data.

The raw stream

The first step in ksqlDB was to dump a sample of the topic just to check what we were working with:

ksql> PRINT ais LIMIT 5;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/02/25 10:50:06.934 Z, key: , value: {"class":"AIS","device":"stdin","type":3,"repeat":0,"mmsi":257124880,"scaled":true,"status":15,"status_text":"Not defined","turn":0,, partition: 0ccuracy":false,"lon":11.257358,"lat":64.902517,"course":85.0,"heading":225,"second":2,"maneuver":0,"raim":false,"radio":25283}
rowtime: 2021/02/25 10:50:06.934 Z, key: , value: {"class":"AIS","device":"stdin","type":1,"repeat":0,"mmsi":257045680,"scaled":true,"status":0,"status_text":"Under way using engine", partition: 0"speed":0.3,"accuracy":true,"lon":16.725387,"lat":68.939000,"course":65.7,"heading":511,"second":5,"maneuver":0,"raim":true,"radio":52}
rowtime: 2021/02/25 10:50:06.934 Z, key: , value: {"class":"AIS","device":"stdin","type":5,"repeat":0,"mmsi":259421000,"scaled":true,"imo":9175030,"ais_version":0,"callsign":"LIPZ","shipname":"ROALDNES","shiptype":30,"shiptype_text":"Fishing","to_bow":10,"to_stern":24,"to_port":5,"to_starboard":5,"epfd":1,"epfd_text":"GPS","eta":"01-16T14:00Z","draught":6.3,"destinati, partition: 0,"dte":0}
rowtime: 2021/02/25 10:50:06.934 Z, key: , value: {"class":"AIS","device":"stdin","type":3,"repeat":0,"mmsi":257039700,"scaled":true,"status":5,"status_text":"Moored","turn":0,"speed, partition: 0y":false,"lon":12.273450,"lat":65.998892,"course":188.6,"heading":36,"second":5,"maneuver":0,"raim":false,"radio":0}
rowtime: 2021/02/25 10:50:06.934 Z, key: , value: {"class":"AIS","device":"stdin","type":5,"repeat":0,"mmsi":257956500,"scaled":true,"imo":0,"ais_version":2,"callsign":"LG9456","shipname":"FROY MULTI","shiptype":0,"shiptype_text":"Not available","to_bow":3,"to_stern":12,"to_port":7,"to_starboard":5,"epfd":1,"epfd_text":"GPS","eta":"00-00T24:60Z","draught":0.0,"destina, partition: 0:0}
Topic printing ceased

AIS data is broadcast as a single stream of messages of different types. Each message type has its own set of fields, along with some common ones.

AIS data broadcast

I used a little bit of command line magic to do a quick inspection on a sample of the data to see how many messages of different types I had. Around 75% were position reports, 15% ship information, and the remainder was a mix of other messages.

ksqlDB can be used to split streams of data based on characteristics of the data, and that’s what we needed to do here so that we’d end up with a dedicated stream of messages for each logical type or group of AIS messages. To do any processing with ksqlDB, you need a schema declared on the data (the source data is just lumps of JSON strings without explicit schema). Because there’s a mix of message types (and thus schemas) in the single stream, it’s hard to declare the schema in its entirety upfront, so we use a little trick here to map the first ksqlDB stream. By specifying the serialisation type as KAFKA, we can delay having to declare the schema but still access fields in the data when we need to for the predicate in splitting the stream:

CREATE STREAM AIS_RAW (MSG VARCHAR) WITH (KAFKA_TOPIC='ais', FORMAT='KAFKA');

This declares a stream on the existing ais topic with a single field that we’ve arbitrarily called MSG. The trick is that we’re using the KAFKA format. If we specified it as JSON (as one may reasonably expect, it being JSON data) then there’d have to be a common root field for us to map, which there isn’t.

With a stream declared, we can query it and check that it’s working. The result is pretty much the same as dumping the data with PRINT, but we’re validating now that ksqlDB is happy reading the data:

ksql> SELECT * FROM AIS_RAW EMIT CHANGES LIMIT 5;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|MSG                                                                                                                                                                                       |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"class":"AIS","device":"stdin","type":3,"repeat":0,"mmsi":259589000,"scaled":true,"status":0,"status_text":"Under way using engine","turn":0,"speed":11.6,"accuracy":false,"lon":11.60895|
|{"class":"AIS","device":"stdin","type":3,"repeat":0,"mmsi":257499000,"scaled":true,"status":5,"status_text":"Moored","turn":0,"speed":0.0,"accuracy":false,"lon":6.447663,"lat":62.593768,|
|{"class":"AIS","device":"stdin","type":1,"repeat":0,"mmsi":259625000,"scaled":true,"status":0,"status_text":"Under way using engine","turn":"nan","speed":0.0,"accuracy":true,"lon":16.542|
|{"class":"AIS","device":"stdin","type":3,"repeat":0,"mmsi":257334400,"scaled":true,"status":5,"status_text":"Moored","turn":0,"speed":0.0,"accuracy":false,"lon":7.732775,"lat":63.113140,|
|{"class":"AIS","device":"stdin","type":5,"repeat":0,"mmsi":257628580,"scaled":true,"imo":0,"ais_version":2,"callsign":"LJ8162","shipname":"MORVIL","shiptype":37,"shiptype_text":"Pleasure|
Limit Reached
Query terminated

Now comes the schema bit. MSG holds the full JSON payload, and we can use EXTRACTJSONFIELD to, as the name suggests, extract JSON fields:

ksql> SELECT EXTRACTJSONFIELD(msg,'$.type') AS MSG_TYPE FROM AIS_RAW EMIT CHANGES LIMIT 5;
+--------------+
|MSG_TYPE      |
+--------------+
|1             |
|1             |
|3             |
|5             |
|1             |
Limit Reached
Query terminated

As shown above, we can set the name of fields that we create (using AS), and we can also CAST data types using other functions, such as TIMESTAMPTOSTRING, as well as use the extracted type field as a predicate:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS TS,
             CAST(EXTRACTJSONFIELD(msg,'$.type') AS INT)                      AS MSG_TYPE,
             CAST(EXTRACTJSONFIELD(msg,'$.status_text') AS VARCHAR)           AS STATUS_TEXT
        FROM AIS_RAW
       WHERE EXTRACTJSONFIELD(msg,'$.type') = '1'
       EMIT CHANGES;