Video transcript - "Streaming ETL in Kafka for Everyone with KSQL"

Published: 2017-11-16 by Pradeep Gowda.

My notes from this video - Streaming ETL in Kafka for Everyone with KSQL a presentation by Hojjat JafarPour (pron: ಹೊಜ್ಜತ್)- Software Engineer at Confluent.

KSQL is the streaming language on top of Kafka.

The talk is about streaming ETL with Kafka and Confluent Platform.


Kafka has connectors to move data in and out of other systems.

Single Message Transform (SMT)

But these are simple transformations that act on single records. If you want to aggregate, filter, or apply UDF (user defined functions), you need something more .. like KSQL.


KSQL is in developer preview now (Nov 2017).

KSQL enables stream processing with “zero coding” by writing stream processing in SQL like language.

You do not need an additional cluster unlike other systems where you need stream processing system.

Example Code

create stream possible_fraud as
select card_number, count(*)
from authorisation_attempts
window tumbling (size 5 seconds)
group by card_number
having count(*) > 3;

KSQL has the notion of stream and table as first class citizens (topics), where stream is data in motion, and collected state of stream as table.

When you are filtering, you are treating the topic as a stream.

When you are doing counts, you are treating a topic as a table.

KSQL features – table joins, aggregate functions, UDF support for complex expressions and JSON, delimited format support, (Avro soon).

Window aggregations

There are three types of WINDOW aggregations:

  1. TUMBLING: fixed-size, non-overlapping, gap-less windows. In this case, we have a window for every minute
SELECT ip, count(*) AS hits FROM clickstream
  1. HOPPING: Fixed-size, overlapping windows. This is like a sliding window.
SELECT ip, sum(BYTES) AS bytes_per_ip_and_bucket
FROM clickstream WINDOW HOPPING (size 20 second, advance by 5 second)
  1. SESSION: dynamically-sized, non-overlapping, data-driven window. For eg: “if nothing happens in next 20 seconds, the window ends.” This is like defining gap between windows instead of size of the windows.
SELECT ip, SUM(bytes) AS bytes_per_ip
FROM clickstream WINDOW SESSION (20 second)

Example of a “Web analytics pipeline


create steam pageviews (viewtime bigint, userid varchar, pageid varchar)
with (kafka_topic='pageviews', value_format='JSON');

create table users (registertime bigint, gender varchar, regionid varchar, userid varchar)
with (kafka_topic='users', value_format='JSON');

Materalized views:

Region visitor count

create stream joined_pageviews as
select users.userid as userid, pageid, regionid, gender
from pageviews left join users on pageviews.userid = users.userid;

create table region_visitor_count as
select regionid, count(*) as visit_count
from joined_pageviews
window tumbling (size 30 second)
group by regionid;

Region visitor demography

create table region_visitor_demo_count as
select regionid, gender, count(*) as visit_count
from joined_pageviews
window tumbling (size 30 second)
group by gender, regionid;

Running KSQL

./bin/ksql-cli local
ksql> create stream ... ;

ksql> show streams;
ksql> describe pageviews;
ksql> select * from pageviews;

The select will show a running stream of output.