Streaming ETL

Streaming Insurance Events

Apache Kafka® is a great way to manage streams of events. KSQL allows you to quickly enrich these events with additional information such as geo-spatial lookups.

In this example, an insurance company wants to help people who have recently lost their mobile phone. We will enrich a stream of insurance claim events, find the location of the loss and recommend the closest service center.

Directions

1. Imagine we have a stream of insurance claim events for people who have lost their insured mobile phone. We know the customer name and the postal code where the loss of the mobile phone occurred. Our Kafka topic phone_event_raw of unfortunate phone losses looks like this:

{"customer_name":"Bob", "phone_model":"iPhone 6s", "event":"dropped", "post_code":"2000"}
{"customer_name":"Alice", "phone_model":"Samsung Note 9", "event":"water", "post_code":"2022"}

2. Run this KSQL statement to create a stream representing insurance claim events:

CREATE STREAM insurance_event_stream WITH (kafka_topic='phone_event_raw', value_format='avro');

3. Our reference data in the Kafka topic POST_CODE is shown below. This topic is a mapping of postal code (e.g., 2000) to the location name (e.g., Sydney) along with its latitude and longitude coordinate:

{"post_code":"2000", "locality":"Sydney", "state":"NSW", "long":151.25664, "lat":-33.85995}
{"post_code":"2022", "locality":"Bondi Beach", "state":"NSW", "long":151.24504, "lat":-33.89640}

4. Run this KSQL statement to create the POST_CODE_TAB reference table. This can be joined to the stream of events from insurance_event_stream to create a new stream insurance_event_with_location:

CREATE TABLE post_code_tab WITH (kafka_topic='POST_CODE', value_format='avro', key='post_code');
 
CREATE STREAM insurance_event_with_location AS \
SELECT * FROM insurance_event_stream iev \
INNER JOIN post_code_tab pc ON iev.post_code = pc.post_code;

5. We can query the stream insurance_event_with_location with a KSQL query to find the location (latitude and longitude) where the loss occurred:

SELECT  iev_customer_name, iev_phone_model, pc_post_code, pc_locality, pc_state, pc_long, pc_lat FROM insurance_event_with_location;
 
Bob | iPhone 6s | 2000 | Sydney | NSW | 151.25 | -33.85
Alice | Samsung Note 9 | 2022 | Bondi Beach | NSW | 151.24 | -33.89
Tim | Oppo | 3000 | Melbourne | VIC | 144.95 | -37.81

6. We can do even better. We want to be able to direct customers to their closest repair center. This is the physical location of each repair shop in each state or county. We simply need to load the location of the repair shops into another KSQL table. The data for the Kafka topic REPAIR_center looks like this:

{"repair_state":"NSW", "long":151.1169, "lat":-33.8635}
{"repair_state":"VIC", "long":145.1549, "lat":-37.9389}

7. Run this KSQL command to create the repair center table repair_center_tab. This can be used with the stream insurance_event_with_location and the geospatial function geo_distance to find the straight-line distance for the customer to reach the repair shop.

CREATE TABLE repair_center_tab WITH (kafka_topic='REPAIR_center', value_format='avro', key='repair_state');
 
CREATE STREAM insurance_event_with_repairer AS \
SELECT *, geo_distance(iel.pc_lat, iel.pc_long, rct.lat, rct.long, 'km') AS dist_to_repairer_km
FROM insurance_event_with_location iel \
INNER JOIN repair_center_tab rct ON iel.pc_state = rct.repair_state;

8. Run the KSQL query like this to generate a detailed list of loss events, and a recommendation for the customer’s closest service center. Hopefully they won’t need to go too far to get their phone repaired.

SELECT TIMESTAMPTOSTRING(rowtime, 'dd/MM HH:mm'), \
iel_iev_customer_name || ' lost ' ||  iel_iev_phone_model || ' due to ' || iel_iev_event  || ' in ' ||  iel_pc_locality \
|| ' (' || iel_pc_state || '), and is ' ||  CAST(round(dist_to_repairer_km) AS VARCHAR) || ' km from a service center' \
FROM insurance_event_with_repairer;
17/02 13:27 | Bob lost iPhone 6s due to dropped in Sydney (NSW), and is 13 km from a service center
17/02 13:32 | Alice lost Samsung Note 9 due to water in Bondi Beach (NSW), and is 12 km from a service center
17/02 13:44 | Tim lost Oppo R7 due to theft in Melbourne (VIC), and is 22 km from a service center
< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.