My notes from this video - Streaming ETL in Kafka for Everyone with KSQL a presentation by Hojjat JafarPour (pron: ಹೊಜ್ಜತ್)- Software Engineer at Confluent.
KSQL is the streaming language on top of Kafka.
The talk is about streaming ETL with Kafka and Confluent Platform.
Kafka has connectors to move data in and out of other systems.
Single Message Transform (SMT)
- Modify events before storing in Kafka
- mask/drop sensitive information
- single message transformation
- set partitioning key
- store lineage
- Modify events going out of Kafka
- route events data stores based on priority (higher -> faster)
- route events to different indexes
- do data conversion/casting to match destination
But these are simple transformations that act on single records. If you want to aggregate, filter, or apply UDF (user defined functions), you need something more .. like KSQL.
KSQL is in developer preview now (Nov 2017).
KSQL enables stream processing with “zero coding” by writing stream processing in SQL like language.
You do not need an additional cluster unlike other systems where you need stream processing system.
create stream possible_fraud as select card_number, count(*) from authorisation_attempts window tumbling (size 5 seconds) group by card_number having count(*) > 3;
KSQL has the notion of stream and table as first class citizens (topics), where stream is data in motion, and collected state of stream as table.
When you are filtering, you are treating the topic as a stream.
When you are doing counts, you are treating a topic as a table.
KSQL features – table joins, aggregate functions, UDF support for complex expressions and JSON, delimited format support, (Avro soon).
There are three types of WINDOW aggregations:
- TUMBLING: fixed-size, non-overlapping, gap-less windows. In this case, we have a window for every minute
SELECT ip, count(*) AS hits FROM clickstream WINDOW TUMBLING(SIZE 1 minute) GROP BY ip;
- HOPPING: Fixed-size, overlapping windows. This is like a sliding window.
SELECT ip, sum(BYTES) AS bytes_per_ip_and_bucket FROM clickstream WINDOW HOPPING (size 20 second, advance by 5 second) GROUP BY ip;
- SESSION: dynamically-sized, non-overlapping, data-driven window. For eg: “if nothing happens in next 20 seconds, the window ends.” This is like defining gap between windows instead of size of the windows.
SELECT ip, SUM(bytes) AS bytes_per_ip FROM clickstream WINDOW SESSION (20 second) GROUP BY ip;
Example of a “Web analytics pipeline
create steam pageviews (viewtime bigint, userid varchar, pageid varchar) with (kafka_topic='pageviews', value_format='JSON'); create table users (registertime bigint, gender varchar, regionid varchar, userid varchar) with (kafka_topic='users', value_format='JSON');
Region visitor count
create stream joined_pageviews as select users.userid as userid, pageid, regionid, gender from pageviews left join users on pageviews.userid = users.userid; create table region_visitor_count as select regionid, count(*) as visit_count from joined_pageviews window tumbling (size 30 second) group by regionid;
Region visitor demography
create table region_visitor_demo_count as select regionid, gender, count(*) as visit_count from joined_pageviews window tumbling (size 30 second) group by gender, regionid;
./bin/ksql-cli local ksql> create stream ... ; ksql> show streams; ksql> describe pageviews; ksql> select * from pageviews;
The select will show a running stream of output.