Fraud detection is a topic applicable to many industries, including banking and financial sectors, insurance, government agencies and law enforcement and more. Fraud attempts have seen a drastic increase in recent years, making fraud detection more important than ever. Despite efforts on the part of the affected institutions, hundreds of millions of dollars are lost to fraud every year. Since relatively few cases show fraud in a large population, finding these can be tricky.
In banking, fraud can involve using stolen credit cards, forging checks, misleading accounting practices, etc. In insurance, 25% of claims contain some form of fraud, resulting in approximately 10% of insurance payout dollars. Fraud can range from exaggerated losses to deliberately causing an accident for the payout. With all the different methods of fraud, finding it becomes harder still.
Detecting fraudulent transactions is one of the classic use cases for real-time data. The business value is clear:
Traditionally, fraud detection has been done using stream processing applications written in languages such as Java. Maybe it’s been done using Kafka Streams or Spark Streaming, too, but regardless of the framework, it has always required a strong understanding of the core language first. This reduces the base of developers who can access stream processing. KSQL changes this, because anyone who can write SQL can now write stream processing applications.
KSQL is a continuous query language. Whilst you can use it for interactive exploration of data—as we will see in a moment—its core purpose is building stream processing applications. Because KSQL is built on Kafka Streams, which is built on Apache Kafka®, it benefits from the horizontal scalability characteristics for both resilience and throughput.
We’re going to see how we can take a stream of inbound ATM transactions and easily set up an application to detect transactions that look fraudulent. We’ll use a three key factors to determine if we think a transaction is fraudulent:
As part of the processing, we’ll also calculate the distance (as the crow flies) between the two suspect transactions, the time between them and, thus, how fast someone would have had to travel to plausibly make the transaction. This could feed into a second iteration of processing to refine the detection further.
We’re going to use Elasticsearch to store the transactions and enriched data, and Kibana to provide visualization and further data inspection capabilities.
KSQL has supported joining between two streams of events since Confluent Platform 5.0. This means you can take two Kafka topics and join between them on a common key. This could be something like a stream of order events and a stream of shipment events—with a stream-stream join, you can match up orders being placed to those being shipped.
A stream-stream join can also make sense when done against the same stream of data, and that’s how we’re going to use it here. To start with, we’re going to take a simplified stream of ATM transactions and use it to understand what a stream-stream join is doing, and the semantics around it. Once we’ve done that, we’ll dive into a full-blown example with a data generator and some realistic test data.
I’m using Docker Compose to easily provision a test environment—if you want to follow along, you can find all the example files on GitHub.
Below are three transactions on three different accounts (ac_01, ac_02, ac_03). They have a transaction amount and location (both name and coordinates).
{"account_id": "ac_01", "atm": "ID_2276369282", "amount": 20, "location": {"lat": "38.6956033", "lon": "-121.5922283"}, "transaction_id": "01"} {"account_id": "ac_02", "atm": "Flying Pig Bistro", "amount": 400, "location": {"lat": "37.766319", "lon": "-122.417422"}, "transaction_id": "02"} {"account_id": "ac_03", "atm": "Wells Fargo", "amount": 50, "location": {"lat": "37.5522855", "lon": "-121.9797997"}, "transaction_id": "04"}
Let’s pull this into KSQL and see what we can do…
ksql> SET 'auto.offset.reset'='earliest'; ksql> CREATE STREAM ATM_TXNS (\ account_id VARCHAR, \ atm VARCHAR, \ location STRUCT<lon DOUBLE, \ lat DOUBLE>, \ amount INT, \ transaction_id VARCHAR) \ WITH (\ KAFKA_TOPIC='atm_txns', \ VALUE_FORMAT='JSON'); ksql> SELECT ACCOUNT_ID, TRANSACTION_ID, AMOUNT, ATM FROM ATM_TXNS; ac_01 | 01 | 20 | ID_2276369282 ac_02 | 02 | 400 | Flying Pig Bistro ac_03 | 04 | 50 | Wells Fargo
So far, so good. No naughty transactions—just three regular ones from three different accounts. Now, let’s fire in two more. The first is a second transaction at the same location for the same account (someone forgot to pick up the drinks bill!); the second one is our nefarious fraudster, with a clone of the card for account ac_03 drawing cash from an ATM elsewhere:
{"account_id": "ac_02", "atm": "Flying Pig Bistro", "amount": 40, "location": {"lat": "37.766319", "lon": "-122.417422"}, "transaction_id": "03"} {"account_id": "ac_03", "atm": "Barclays", "amount": 500, "location": {"lat": "33.5522855", "lon": "-120.9797997"}, "transaction_id": "X05"}
You’ll see these two new transactions appear straightaway in the KSQL output:
ksql> SELECT ACCOUNT_ID, TRANSACTION_ID, AMOUNT, ATM FROM ATM_TXNS; ac_01 | 01 | 20 | ID_2276369282 ac_02 | 02 | 400 | Flying Pig Bistro ac_03 | 04 | 50 | Wells Fargo ac_02 | 03 | 40 | Flying Pig Bistro ac_03 | X05 | 500 | Barclays
ac_03 is the one we’re interested in, and the transaction ID has an X prefix to help us spot it in the output, and the key thing is that it’s the same account in a different location (Flying Pig Bistro and then Barclays). What about time window? I’m glad you asked!
All Kafka messages have a timestamp (as well as a key and value). This can be set by the producing application, or it will otherwise take the value of the time at which the message arrived at the broker. KSQL exposes the message’s timestamp through the system column ROWTIME:
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'),ACCOUNT_ID, TRANSACTION_ID, AMOUNT, ATM FROM ATM_TXNS; 2018-10-05 16:56:08 | ac_01 | 01 | 20 | ID_2276369282 2018-10-05 16:56:08 | ac_02 | 02 | 400 | Flying Pig Bistro 2018-10-05 16:56:08 | ac_03 | 04 | 50 | Wells Fargo 2018-10-05 17:01:59 | ac_02 | 03 | 40 | Flying Pig Bistro 2018-10-05 17:01:59 | ac_03 | X05 | 500 | Barclays
The two transactions for account ac_03 occurred within the span of just over five minutes.
By eyeballing this data, we can spot that something doesn’t look right; but how do we express this in stream processing terms? Enter stream-stream joins.
Just as rows from two database tables can be joined, the same is possible with streams. The only difference is that since streams are unbounded (semantically—and often in practice—there is no defined end to the events arriving on the stream), the join syntax needs to account for this. Whilst a join between two tables in a database will consider all rows as candidates for matching on the join key, a stream-stream join will typically expect a window in which to look for events—which are, in effect, “rows”—to join. Otherwise, the stream processing engine would have to cache all events indefinitely.
In KSQL, the time window is expressed using the WITHIN clause of the join. You can specify either a single window value which applies to either side of the event, or two window values, giving the range before and after the event in which the join is to be evaluated. This time window is a convenient vehicle for our fraud criteria—any two transactions within a 10-minute window are to be captured.
We’ll begin by creating a second KSQL stream for the transaction events:
CREATE STREAM ATM_TXNS_02 WITH (PARTITIONS=1) AS SELECT * FROM ATM_TXNS;
Now we have two identical streams, one of which is actually a clone of the first:
ksql> SHOW STREAMS;
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), ACCOUNT_ID, TRANSACTION_ID, AMOUNT, ATM FROM ATM_TXNS_02; 2018-10-05 16:56:08 | ac_03 | 04 | 50 | Wells Fargo 2018-10-05 16:56:08 | ac_01 | 01 | 20 | ID_2276369282 2018-10-05 17:01:59 | ac_03 | X05 | 500 | Barclays 2018-10-05 16:56:08 | ac_02 | 02 | 400 | Flying Pig Bistro 2018-10-05 17:01:59 | ac_02 | 03 | 40 | Flying Pig Bistro
Note: This is a necessary step that in principle could be handled by KSQL. It is tracked in GitHub issue 2030.
ksql> SELECT TIMESTAMPTOSTRING(T1.ROWTIME, 'yyyy-MM-dd HH:mm:ss'), TIMESTAMPTOSTRING(T2.ROWTIME, 'yyyy-MM-dd HH:mm:ss'), \ T1.ACCOUNT_ID, T2.ACCOUNT_ID, \ T1.TRANSACTION_ID, T2.TRANSACTION_ID, \ T1.LOCATION, T2.LOCATION \ FROM ATM_TXNS T1 \ INNER JOIN ATM_TXNS_02 T2 \ WITHIN 10 MINUTES \ ON T1.ACCOUNT_ID = T2.ACCOUNT_ID ;2018-10-05 16:56:08 | 2018-10-05 16:56:08 | ac_01 | ac_01 | 01 | 01 | {LON=-121.5922283, LAT=38.6956033} | {LON=-121.5922283, LAT=38.6956033} 2018-10-05 16:56:08 | 2018-10-05 16:56:08 | ac_02 | ac_02 | 02 | 02 | {LON=-122.417422, LAT=37.766319} | {LON=-122.417422, LAT=37.766319} 2018-10-05 16:56:08 | 2018-10-05 17:01:59 | ac_02 | ac_02 | 02 | 03 | {LON=-122.417422, LAT=37.766319} | {LON=-122.417422, LAT=37.766319} 2018-10-05 16:56:08 | 2018-10-05 16:56:08 | ac_03 | ac_03 | 04 | 04 | {LON=-121.9797997, LAT=37.5522855} | {LON=-121.9797997, LAT=37.5522855} 2018-10-05 16:56:08 | 2018-10-05 17:01:59 | ac_03 | ac_03 | 04 | X05 | {LON=-121.9797997, LAT=37.5522855} | {LON=-120.9797997, LAT=33.5522855} 2018-10-05 17:01:59 | 2018-10-05 16:56:08 | ac_02 | ac_02 | 03 | 02 | {LON=-122.417422, LAT=37.766319} | {LON=-122.417422, LAT=37.766319} 2018-10-05 17:01:59 | 2018-10-05 17:01:59 | ac_02 | ac_02 | 03 | 03 | {LON=-122.417422, LAT=37.766319} | {LON=-122.417422, LAT=37.766319} 2018-10-05 17:01:59 | 2018-10-05 16:56:08 | ac_03 | ac_03 | X05 | 04 | {LON=-120.9797997, LAT=33.5522855} | {LON=-121.9797997, LAT=37.5522855} 2018-10-05 17:01:59 | 2018-10-05 17:01:59 | ac_03 | ac_03 | X05 | X05 | {LON=-120.9797997, LAT=33.5522855} | {LON=-120.9797997, LAT=33.5522855}
Looking at the output, we see a lot more than just the fraudulent transaction we’re expecting to identify. We can explain these additional matches as thus:
Commentary | T1 timestamp |
T2 timestamp |
T1 account |
T2 account |
T1 TXN ID |
T2 TXN ID |
T1 Location |
T2 Location |
self-join | 2018-10-05 16:56:08 | 2018-10-05 16:56:08 | ac_01 | ac_01 | 01 | 01 | {LON=-121.5922283, LAT=38.6956033} | {LON=-121.5922283, LAT=38.6956033} |
self-join | 2018-10-05 16:56:08 | 2018-10-05 16:56:08 | ac_02 | ac_02 | 02 | 02 | {LON=-122.417422, LAT=37.766319} | {LON=-122.417422, LAT=37.766319} |
self-join | 2018-10-05 16:56:08 | 2018-10-05 17:01:59 | ac_02 | ac_02 | 02 | 03 | {LON=-122.417422, LAT=37.766319} | {LON=-122.417422, LAT=37.766319} |
self-join | 2018-10-05 16:56:08 | 2018-10-05 16:56:08 | ac_03 | ac_03 | 04 | 04 | {LON=-121.9797997, LAT=37.5522855} | {LON=-121.9797997, LAT=37.5522855} |
!FRAUD! | 2018-10-05 16:56:08 | 2018-10-05 17:01:59 | ac_03 | ac_03 | 04 | X05 | {LON=-121.9797997, LAT=37.5522855} | {LON=-120.9797997, LAT=33.5522855} |
valid (same location, not shown) | 2018-10-05 17:01:59 | 2018-10-05 16:56:08 | ac_02 | ac_02 | 03 | 02 | {LON=-122.417422, LAT=37.766319} | {LON=-122.417422, LAT=37.766319} |
self-join | 2018-10-05 17:01:59 | 2018-10-05 17:01:59 | ac_02 | ac_02 | 03 | 03 | {LON=-122.417422, LAT=37.766319} | {LON=-122.417422, LAT=37.766319} |
!FRAUD! (duplicate) | 2018-10-05 17:01:59 | 2018-10-05 16:56:08 | ac_03 | ac_03 | X05 | 04 | {LON=-120.9797997, LAT=33.5522855} | {LON=-121.9797997, LAT=37.5522855} |
self-join | 2018-10-05 17:01:59 | 2018-10-05 17:01:59 | ac_03 | ac_03 | X05 | X05 | {LON=-120.9797997, LAT=33.5522855} | {LON=-120.9797997, LAT=33.5522855} |
The first thing to do is weed out the join results where it’s just the same event joining to itself (that is, the transaction ID is the same):
ksql> SELECT […] WHERE T1.TRANSACTION_ID != T2.TRANSACTION_ID ;2018-10-05 17:01:59 | 2018-10-05 16:56:08 | ac_02 | ac_02 | 03 | 02 | {LON=-122.417422, LAT=37.766319} | {LON=-122.417422, LAT=37.766319} 2018-10-05 17:01:59 | 2018-10-05 16:56:08 | ac_03 | ac_03 | X05 | 04 | {LON=-120.9797997, LAT=33.5522855} | {LON=-121.9797997, LAT=37.5522855} 2018-10-05 16:56:08 | 2018-10-05 17:01:59 | ac_02 | ac_02 | 02 | 03 | {LON=-122.417422, LAT=37.766319} | {LON=-122.417422, LAT=37.766319} 2018-10-05 16:56:08 | 2018-10-05 17:01:59 | ac_03 | ac_03 | 04 | X05 | {LON=-121.9797997, LAT=37.5522855} | {LON=-120.9797997, LAT=33.5522855}
Much better. Now, we just need to eliminate the transactions on the same account that took place at the same location—in this model, our fraud criteria determine those as unsuspicious.
ksql> SELECT […] WHERE […] (T1.location->lat != T2.location->lat OR \ T1.location->lon != T2.location->lon);2018-10-05 16:56:08 | 2018-10-05 17:01:59 | ac_03 | ac_03 | 04 | X05 | {LON=-121.9797997, LAT=37.5522855} | {LON=-120.9797997, LAT=33.5522855} 2018-10-05 17:01:59 | 2018-10-05 16:56:08 | ac_03 | ac_03 | X05 | 04 | {LON=-120.9797997, LAT=33.5522855} | {LON=-121.9797997, LAT=37.5522855}
The only two results are those on the account ac_03, one being genuine (transaction ID 04) and the other fraudulent (X05). We’re getting both returned, as each is an event on the left hand stream (the driving one) that joins to the other based on the time window specified (10 minutes before or after the driving event). All we need to do is change our join window to only return events that happen after the one we’re using to drive the join. To do this, simply specify a zero BEFORE threshold in the WITHIN criteria:
ksql> SELECT […] WITHIN (0 MINUTES, 10 MINUTES) \ […]2018-10-05 16:56:08 | 2018-10-05 17:01:59 | ac_03 | ac_03 | 04 | X05 | {LON=-121.9797997, LAT=37.5522855} | {LON=-120.9797997, LAT=33.5522855}
With the core logic of the statement built, let’s add in a few more bells and whistles. Using the built-in GEO_DISTANCE function, we can include a column in the output showing the distance between the two transactions:
ksql> SELECT […] T1.location, \ T2.location, \ GEO_DISTANCE(T1.location->lat, T1.location->lon, T2.location->lat, T2.location->lon, 'KM') AS DISTANCE_BETWEEN_TXN_KM, \ […]{LON=-121.9797997, LAT=37.5522855} | {LON=-120.9797997, LAT=33.5522855} | 453.87740037465375
We see that transaction 04 took place over 450 kilometers, as the crow flies from X05. What was the time duration between them? We can answer this pretty easily based on the timestamp, but it’s more sensible just to include it in the query:
ksql> SELECT […] (T2.ROWTIME - T1.ROWTIME) AS MILLISECONDS_DIFFERENCE, \ (CAST(T2.ROWTIME AS DOUBLE) - CAST(T1.ROWTIME AS DOUBLE)) / 1000 / 60 AS MINUTES_DIFFERENCE, \ (CAST(T2.ROWTIME AS DOUBLE) - CAST(T1.ROWTIME AS DOUBLE)) / 1000 / 60 / 60 AS HOURS_DIFFERENCE, \ GEO_DISTANCE(T1.location->lat, T1.location->lon, T2.location->lat, T2.location->lon, 'KM') / ((CAST(T2.ROWTIME AS DOUBLE) - CAST(T1.ROWTIME AS DOUBLE)) / 1000 / 60 / 60) AS KMH_REQUIRED, […]351473 | 5.8578833333333336 | 0.09763138888888889 | 4648.888083433872
We’ve also combined the distance and time calculations to derive the speed at which someone would have to move between the two events. At 4,648 kph, it’s almost four times the speed of the fastest supersonic car—we can be pretty sure it’s fraudulent!
One remaining point to make about the above query is that the message’s timestamp (ROWTIME) is cast from its BIGINT data type to DOUBLE so that the subsequent division arithmetic will work.
To see what our query looks like against a continuous stream of transactions, let’s fire up our data generator. I’m using an open source tool called gess, which I’ve forked and tweaked to suit this demo.
It works by taking a list of ATMs, generating transactions against them and emitting these to UDP. UDP is a networking protocol in the same way that TCP is, but unlike TCP, UDP doesn’t require any kind of acknowledgement of delivery. Instead, it just fires bytes out into the ether, and if someone picks them up, that’s great, and if not, that’s fine too. It makes for a useful test rig where we can start up the data generator and simply tap into the event stream as and when we want to.
To route the events to Kafka from UDP, I’m using two grea