data masking

Data Masking with KSQL

KSQL streaming queries run continuously. You can persist the streaming query output to a Kafka topic by using the KSQL CREATE STREAM AS syntax. KSQL takes a realtime feed of events from one Kafka topic, transforms them, and writes them continually to another.

This example shows how to mask streaming data from an inbound topic that contains personally identifiable information (PII) and persist the output to a Kafka topic.

Click here to learn more about KSQL

Directions

Confluent Platform must be installed before using this recipe. KSQL is a component of Confluent Platform and the KSQL binaries are a part of the Confluent Platform bundle.

In this example, a source event stream named purchases is used.

{

  "order_id": 1,

  "customer_name": "Maryanna Andryszczak",

  "date_of_birth": "1922-06-06T02:21:59Z",

  "product": "Nut - Walnut, Pieces",

  "order_total_usd": "1.65",

  "town": "Portland",

  "country": "United States"

}

01. In KSQL, register the purchases stream:

ksql> CREATE STREAM purchases \

      (order_id INT, customer_name VARCHAR, date_of_birth VARCHAR, \

      product VARCHAR, order_total_usd VARCHAR, \
      town VARCHAR, country VARCHAR) \
      WITH (KAFKA_TOPIC='purchases', VALUE_FORMAT='JSON');



Message

----------------

Stream created

----------------


02. Create a derived topic in which all personally identifiable information (PII) is excluded:

ksql> CREATE STREAM PURCHASES_PII_MASKED AS \

      SELECT ORDER_ID, PRODUCT, ORDER_TOTAL_USD, TOWN, COUNTRY \

      FROM PURCHASES;



Message

----------------------------

Stream created and running

----------------------------

03. Query the Kafka topic and you will see that it does not contain any PII data:

ksql> DESCRIBE PURCHASES_PII_MASKED;



Field | Type

---------------------------------------------

ROWTIME | BIGINT (system)

ROWKEY | VARCHAR(STRING) (system)

ORDER_ID | INTEGER

PRODUCT | VARCHAR(STRING)

ORDER_TOTAL_USD | VARCHAR(STRING)

TOWN | VARCHAR(STRING)

COUNTRY | VARCHAR(STRING)

---------------------------------------------



ksql> PRINT 'PURCHASES_PII_MASKED';

Format:JSON

{"ROWTIME":1525960235832,"ROWKEY":"null","ORDER_ID":1,"COUNTRY":"United States","TOWN":"Portland","PRODUCT":"Nut - Walnut, Pieces","ORDER_TOTAL_USD":"1.65"}

{"ROWTIME":1525960258302,"ROWKEY":"null","ORDER_ID":3,"COUNTRY":"United States","TOWN":"Honolulu","PRODUCT":"Veal - Chops, Split, Frenched","ORDER_TOTAL_USD":"1.59"}

[...]
< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.