Kafka-PyFlink Getting Started-Part 4-Tumbling Window aggregation on Kafka with pyflink FlinkSQL

Diptiman Raichaudhuri
9 min readMar 17, 2024

--

Disclosure: All opinions expressed in this article are my own, and represent no one but myself and not those of my current or any previous employers.

Armed with the knowledge of setting up a brand new Kafka broker with KRaft consensus mode, pyflink and FlinkSQL in my last 3 articles, let’s understand how FlinkSQL transformations can apply aggregations on real time streaming data using sophisticated stream processing techniques like Tumbling Window().

Here’s the whole series :

  1. Kafka with KRaft on Windows
  2. Data Engineering with pyflink Table API
  3. Data Engineering with FlinkSQL
  4. FlinkSQL Tumbling Window aggregation
  5. Pyflink Table API Tumbling Window aggregation
  6. Pyflink Table API UDF — User Defined Functions

If the Kafka-pyflink DEV environment is not complete, please go through these articles in order and run the code examples.

Steps that I follow for this article :

  1. Fire up the Kafka broker with KRaft mode (as explained in Part 1 of this series).
  2. I simulate a sensor farm and emit sensor readings from a set of devices. One of the tags is ‘ampere-hour’ emitted by each device. These sensor readings are produced to a Kafka topic within the Kafka broker with KRaft mode
  3. FlinkSQL transforms are applied on real time on these readings, to SUM the ampere-hours consumed by all devices on a TUMBLING WINDOW of 30 seconds interval
  4. The aggregate stream with a SUM of charge consumed is written back to another Kafka topic.

Step 1— Start Kafka broker and create topics needed for this exercise

I started my Kafka broker and created 2 topics : sensor.readings and device.charge .

Topic : sensor.readings
Topic device.charge

Step 2— Simulated Sensor Program

The simulated sensor program uses random.random and a couple of variations of it to generate sensor data :

def sensor_event():

DEVICES = ['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']
device_id = random.choice(DEVICES)
co = round(random.uniform(0.0011, 0.0072), 4)
humidity = round(random.uniform(45.00, 78.00), 2)
motion = random.choice([True, False])
temp = round(random.uniform(17.00, 36.00), 2)
amp_hr = round(random.uniform(0.10, 1.80), 2)
event_ts = int(time.time() * 1000)

sensor_event = {
'device_id': device_id,
'co': co,
'humidity': humidity,
'motion': motion,
'temp': temp,
'ampere_hour': amp_hr,
'ts': event_ts
}
return sensor_event

Added a delivery callback to understand if events are published properly or not :

def delivery_report(err, event):
if err is not None:
print(f'Delivery failed on reading for {event.key().decode("utf8")}: {err}')
else:
print(f'Device reading for {event.key().decode("utf8")} produced to {event.topic()}')

Kafka producer configurations are abstracted out in a new python file config.py :

config = {
'bootstrap.servers': 'localhost:9098',
'client.id': 'device.tags'
}

This config.py module is imported and the config dict is passed to the Kafka producer in the following line :

from config import config

............

producer = Producer(config)

And, I added the main() method to produce device data to my Kafka Topic

if __name__ == '__main__':
topic = 'sensor.readings'
device_data = sensor_event()
producer = Producer(config)

try:
while True:
device_data = sensor_event()
print(json.dumps(device_data))
producer.produce(topic=topic, key=device_data['device_id'],
value=json.dumps(device_data),
on_delivery=delivery_report)
time.sleep(5)
except Exception as e:
print(e)
finally:
producer.flush()

Since, all 3 devices that I have chosen, will emit events randomly, I created the event with the key of the device_id, so that, each device gets it’s own partition.

Ensure that the topic that you created matches with the topic variable in this program. We are publishing these events to the sensor.readings topic.

Run the program, since, the events are published in a while infinite loop, the program will keep on running, till it crashes or we kill the process.

The print(json.dumps(device_data)) will print each message on the pycharm console.

Here’s the full code (also in GitHub) :

import json
import random
from config import config

from confluent_kafka import Producer
import time

# 04_kafka_flinksql_producer.py
# =============================

def delivery_report(err, event):
if err is not None:
print(f'Delivery failed on reading for {event.key().decode("utf8")}: {err}')
else:
print(f'Device reading for {event.key().decode("utf8")} produced to {event.topic()}')

def sensor_event():

DEVICES = ['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']
device_id = random.choice(DEVICES)
co = round(random.uniform(0.0011, 0.0072), 4)
humidity = round(random.uniform(45.00, 78.00), 2)
motion = random.choice([True, False])
temp = round(random.uniform(17.00, 36.00), 2)
amp_hr = round(random.uniform(0.10, 1.80), 2)
event_ts = int(time.time() * 1000)

sensor_event = {
'device_id': device_id,
'co': co,
'humidity': humidity,
'motion': motion,
'temp': temp,
'ampere_hour': amp_hr,
'ts': event_ts
}
return sensor_event

if __name__ == '__main__':
topic = 'sensor.readings'
device_data = sensor_event()
producer = Producer(config)

try:
while True:
device_data = sensor_event()
print(json.dumps(device_data))
producer.produce(topic=topic, key=device_data['device_id'],
value=json.dumps(device_data),
on_delivery=delivery_report)
time.sleep(5)
except Exception as e:
print(e)
finally:
producer.flush()

Step 3— Running a FlinkSQL Aggregation on streaming data

Now that events are getting published to the sensor.readings topic, let’s write the aggregation needed.

I intend to SUM the amepere-hour readings and group-by each device, so that we understand what is overall charge consumed by these devices.

If you have already read the last 2 articles in this series, you would appreciate that we need to create a TableEnvironment first to start the Flink application. For this exercise, we use the EnvironmentSettings.in_streaming_mode() and a StreamTableEnvironment to create the TableEnvironment :

env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.in_streaming_mode()
tenv = StreamTableEnvironment.create(env, settings)

In order for pyflink to connect to Kafka, we also need to add the flink-sql-connector-kafka JAR file and then write the Kafka connector configuration when we run FlinkSQL commands. Download this jar file from this maven repo. The jar file here. Copy the JAR file within the pycharm project where this program is running.

Then, add the jar file within StreamExecutionEnvironment , as shown below :

env.add_jars("file:///D:\\testing_space\\PycharmProjects\\kafka-flink-getting-started\\flink-sql-connector-kafka-3.1.0-1.18.jar")

Now, let’s break down the code for the FlinkSQL processor :

Create a FlinkSQL dynamic table sensor_readings on the stream, for topic sensor.readings and execute the DDL to create the FlinkSQL dynamic table table in Flink’s default catalog.

To read more about Flink Dynamic Table and Continuous Query , which, in reality forms the foundation of FlinkSQL processing and imposes relational rules on streaming data, please go through this wonderful Flink doc ! A small excerpt here :

Dynamic tables are the core concept of Flink’s Table API and SQL support for streaming data. In contrast to the static tables that represent batch data, dynamic tables change over time. But just like static batch tables, systems can execute queries over dynamic tables. Querying dynamic tables yields a Continuous Query. A continuous query never terminates and produces dynamic results — another dynamic table. The query continuously updates its (dynamic) result table to reflect changes on its (dynamic) input tables. Essentially, a continuous query on a dynamic table is very similar to a query that defines a materialized view.”

src_ddl = """
CREATE TABLE sensor_readings (
device_id VARCHAR,
co DOUBLE,
humidity DOUBLE,
motion BOOLEAN,
temp DOUBLE,
ampere_hour DOUBLE,
ts BIGINT,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'sensor.readings',
'properties.bootstrap.servers' = 'localhost:9098',
'properties.group.id' = 'device.tumbling.w.sql',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
)
"""
tenv.execute_sql(src_ddl)

For the aggregation bit, we need to define a group-by that runs on streaming data. Unlike batch processing group-by clause, which inherently runs on a BOUNDED CONTEXT , meaning a table with static boundaries of data, streaming group-by runs on an UNBOUNDED CONTEXT , meaning a table with no end really !! No end, since, events get added continuously (from the producer program written earlier ! !). So, for a streaming group-by , we need to define a WINDOW of time, over which this group-by would operate. Understanding WINDOW functions for streaming data and the difference between EVENT-TIME and PROCESS-TIME requires theoretical explanations, which I will discuss later in the series.

For now, let us write a streaming group-by on these devices, for a 30 SECONDS non-overlapping WINDOW. Which means, FlinkSQL would process these events for a 30 seconds boundary, apply the GROUP-BY on devices(device-id) and then SUM the ampere-hour reading, which will be stored in the charge-consumed field of the sink table. Here’s the FlinkSQL syntaxt to achieve this :

# Process a Tumbling Window Aggregate Calculation of Ampere-Hour
# For every 30 seconds non-overlapping window
# Calculate the total charge consumed grouped by device
tumbling_w_sql = """
SELECT
device_id,
TUMBLE_START(proctime, INTERVAL '30' SECONDS) AS window_start,
TUMBLE_END(proctime, INTERVAL '30' SECONDS) AS window_end,
SUM(ampere_hour) AS charge_consumed
FROM sensor_readings
GROUP BY
TUMBLE(proctime, INTERVAL '30' SECONDS),
device_id
"""

Notice, how the TUMBLING WINDOW is scoped over a 30 seconds window on PROCTIME , which is the time when these events land on the Flink processor on my laptop. The other time attribute is ts , which is the event time and denotes the time when the event was generated within the device. I leave it as an exercise to you to create similar TUMBLING WINDOW functions based on EVENT TIME as well.

Next, we run the TUMBLING WINDOW SQL and get back a handle on the result, which is a pyflink.table.Table object and we would utilize this handle to operate on the sink.

tumbling_w = tenv.sql_query(tumbling_w_sql)

Let’s now create the FlinkSQL sink, which is the 2nd Kafka topic that was created device.charge .

sink_ddl = """
CREATE TABLE devicecharge (
device_id VARCHAR,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
charge_consumed DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'device.charge',
'properties.bootstrap.servers' = 'localhost:9098',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
)
"""

This would be populated by the SELECT query where the TUMBLING WINDOW aggregates are applied. When we run a select query on this table, we get the table bounds , start and end of TIME and the aggregated charge_consumed .

Let’s execute the sink_ddl and then insert the result of this streaming SQL into the sink :

tenv.execute_sql(sink_ddl)
tumbling_w.execute_insert('devicecharge').wait()

The wait() method waits for the FlinkJob to finish, and for this exercise will wait indefinitely since, the producer is running in an infinite loop. We could have also passed in a duration param within wait(), but, let’s keep it simple for this exercise.

Here’s the full processor bundled in a main() method :


from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

# 05_kafka_flinksql_tumbling_window.py
# =====================================

def main():
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.in_streaming_mode()
tenv = StreamTableEnvironment.create(env, settings)

env.add_jars("file:///D:\\testing_space\\PycharmProjects\\kafka-flink-getting-started\\flink-sql-connector-kafka-3.1.0-1.18.jar")

src_ddl = """
CREATE TABLE sensor_readings (
device_id VARCHAR,
co DOUBLE,
humidity DOUBLE,
motion BOOLEAN,
temp DOUBLE,
ampere_hour DOUBLE,
ts BIGINT,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'sensor.readings',
'properties.bootstrap.servers' = 'localhost:9098',
'properties.group.id' = 'device.tumbling.w.sql',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
)
"""
tenv.execute_sql(src_ddl)
sensor_readings_tab = tenv.from_path('sensor_readings')

# Process a Tumbling Window Aggregate Calculation of Ampere-Hour
# For every 30 seconds non-overlapping window
# Calculate the total charge consumed grouped by device
tumbling_w_sql = """
SELECT
device_id,
TUMBLE_START(proctime, INTERVAL '30' SECONDS) AS window_start,
TUMBLE_END(proctime, INTERVAL '30' SECONDS) AS window_end,
SUM(ampere_hour) AS charge_consumed
FROM sensor_readings
GROUP BY
TUMBLE(proctime, INTERVAL '30' SECONDS),
device_id
"""

tumbling_w = tenv.sql_query(tumbling_w_sql)

sink_ddl = """
CREATE TABLE devicecharge (
device_id VARCHAR,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
charge_consumed DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'device.charge',
'properties.bootstrap.servers' = 'localhost:9098',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
)
"""

tenv.execute_sql(sink_ddl)
tumbling_w.execute_insert('devicecharge').wait()

if __name__ == '__main__':
main()

Run the program, ideally, it should initiate the process and run continuously .

Step 4— Run the Kafka console consumer to analyze the result

Before this step, ensure that the Kafka broker is up and running, both topics are created and both the sensor producer and the flink processor programs are running without an error.

Then run the Kafka console consumer from IDE :

%KAFKA361_KRAFT_HOME%\bin\windows\kafka-console-consumer.bat --topic device.charge --bootstrap-server localhost:9098 --from-beginning
device.charge topic

Let’s analyze the result :

{"device_id":"1c:bf:ce:15:ec:4d","window_start":"2024-03-17 09:20:00","window_end":"2024-03-17 09:20:30","charge_consumed":1.23}
{"device_id":"00:0f:00:70:91:0a","window_start":"2024-03-17 09:20:00","window_end":"2024-03-17 09:20:30","charge_consumed":2.6100000000000003}
{"device_id":"b8:27:eb:bf:9d:51","window_start":"2024-03-17 09:20:00","window_end":"2024-03-17 09:20:30","charge_consumed":0.45}

{"device_id":"1c:bf:ce:15:ec:4d","window_start":"2024-03-17 09:20:30","window_end":"2024-03-17 09:21:00","charge_consumed":3.46}
{"device_id":"00:0f:00:70:91:0a","window_start":"2024-03-17 09:20:30","window_end":"2024-03-17 09:21:00","charge_consumed":1.91}
{"device_id":"b8:27:eb:bf:9d:51","window_start":"2024-03-17 09:20:30","window_end":"2024-03-17 09:21:00","charge_consumed":1.04}

{"device_id":"00:0f:00:70:91:0a","window_start":"2024-03-17 09:21:00","window_end":"2024-03-17 09:21:30","charge_consumed":0.15}
{"device_id":"1c:bf:ce:15:ec:4d","window_start":"2024-03-17 09:21:00","window_end":"2024-03-17 09:21:30","charge_consumed":2.26}
{"device_id":"b8:27:eb:bf:9d:51","window_start":"2024-03-17 09:21:00","window_end":"2024-03-17 09:21:30","charge_consumed":3.4699999999999998}

Look at each 3 records, these are grouped by device-id and for each 30 seconds bounds on the stream ( using PROCTIME ), ampere-hour is summed up and stored in the charge_consumed field. Each boundary starts with the WINDOW_START timestamp and ends at WINDOW_END timestamp and each window is of 30 seconds duration.

Perfect ! The FlinkSQL processor is running fine and the sink topic is storing the aggregated result. We can take this aggregated results and augment a lakehouse or a datalake with streaming data metrics or we could attach a real time dashboard using elasticsearch and take this device.charge topic as its source.

This concludes the TUMBLING WINDOW aggregate using FlinkSQL on a Kafka topic. With this and the last 2 articles, we have some understanding of pyflink data processing framework and running FlinkSQL jobs.

In the next article I will discuss more about Flink jobs and the infrastructure required to run production grade streaming Flink processing jobs on Kafka and how Confluent Cloud with its managed serverless Kafka and FlinkSQL offerings provide a production grade platform to build such a system.

Here’s the GitHub repo

Stay tuned and happy coding !

--

--