Kafka-PyFlink Getting Started-Part 5-Pyflink Table API Tumbling Window Aggregation

Diptiman Raichaudhuri
5 min readMar 27, 2024

--

Disclosure: All opinions expressed in this article are my own, and represent no one but myself and not those of my current or any previous employers.

In the last article of the series, I demonstrated a FlinkSQL tumbling window operating on streaming data from a Kafka topic.

Pyflink Table API is another approach of writing the same aggregation using DSL.

In this article, I perform the same tumbling window groupby, but, this time, with pyflink Table API.

Please note that both the Table API as well as FlinkSQL produces the same explain plan for a query.

If you haven’t gone through the series so far, here’s all the parts :

  1. Kafka with KRaft on Windows
  2. Data Engineering with pyflink Table API
  3. Data Engineering with FlinkSQL
  4. FlinkSQL Tumbling Window aggregation
  5. Pyflink Table API Tumbling Window aggregation
  6. Pyflink Table API UDF — User Defined Functions

Let’s take the same example as the last article and fire up a producer which produces sensor readings and streams these events to a Kafka topic.

The goal remains the same as my last article, where

  1. Create 2 kafka topics : sensor.readings and device.charge
  2. I would run a simulated sensor device farm with 3 devices emitting events to the sensor.readings kafka topic.
  3. I would read sensor readings from the kafka topic using a pyflink dynamic table.
  4. Use pyflink Table API to SUM the ampere-hour readings emitted by 3 sensors over a non-overlapping window ( a.k.a tumbling window) of 30 seconds
  5. Ingest the aggregated stream in another kafka topic (pyflink sink !).
  6. Investigate that the tumbling window group by works fine by testing the new topic with a console consumer.

Except the pyflink Table API, all other steps remain the same like the last article, so, let’s break the Table API transform down :

First, the customary imports and creating the TableEnvironment variable :

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumble

# 06_kafka_pyflink_tableapi_tumbling_window.py

def main():
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.in_streaming_mode()
tenv = StreamTableEnvironment.create(env, settings)

Then adding the flink-sql-connector-kafka jar, from here, to the env .

env.add_jars("file:///D:\\testing_space\\PycharmProjects\\kafka-flink-getting-started\\flink-sql-connector-kafka-3.1.0-1.18.jar")

Followed by creation of the sensor_readings table using the sensor.readings Kafka topic and executing the source DDL :

src_ddl = """
CREATE TABLE sensor_readings (
device_id VARCHAR,
co DOUBLE,
humidity DOUBLE,
motion BOOLEAN,
temp DOUBLE,
ampere_hour DOUBLE,
ts BIGINT,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'sensor.readings',
'properties.bootstrap.servers' = 'localhost:9098',
'properties.group.id' = 'device.tumbling.w.sql',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
)
"""
tenv.execute_sql(src_ddl)
sensor_readings_tab = tenv.from_path('sensor_readings')

Now, we define the same 30 seconds tumbling window SUM aggregation of the ampere_hour column grouped by devices :

# Define a Tumbling Window Aggregate Calculation of ampere-hour sensor readings
# - For every 30 seconds non-overlapping window
# - Sum of charge consumed by each device
tumbling_w = sensor_readings_tab.window(Tumble.over(lit(30).seconds)
.on(sensor_readings_tab.proctime)
.alias('w')) \
.group_by(col('w'), sensor_readings_tab.device_id) \
.select(sensor_readings_tab.device_id,
col('w').start.alias('window_start'),
col('w').end.alias('window_end'),
sensor_readings_tab.ampere_hour.sum.alias('charge_consumed'))

The DSL works very intuitively ! A tumbling window computation is performed on sensor_readings.proctime column, where the SUM() of ampere_hour is performed, grouped by devices.

This computation is performed, every 30 seconds, so the pyflink job runs indefinitely and computes this SUM() on an unbounded context !

Then we create a sink table to hold the aggregated data, on top of the other kafka topic created and execute the insert query :

sink_ddl = """
CREATE TABLE devicecharge (
device_id VARCHAR,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
charge_consumed DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'device.charge',
'properties.bootstrap.servers' = 'localhost:9098',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
)
"""

tenv.execute_sql(sink_ddl)
tumbling_w.execute_insert('devicecharge').wait()

if __name__ == '__main__':
main()

Here’s the full Table API code :

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumble

def main():
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.in_streaming_mode()
tenv = StreamTableEnvironment.create(env, settings)

env.add_jars("file:///D:\\testing_space\\PycharmProjects\\kafka-flink-getting-started\\flink-sql-connector-kafka-3.1.0-1.18.jar")

src_ddl = """
CREATE TABLE sensor_readings (
device_id VARCHAR,
co DOUBLE,
humidity DOUBLE,
motion BOOLEAN,
temp DOUBLE,
ampere_hour DOUBLE,
ts BIGINT,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'sensor.readings',
'properties.bootstrap.servers' = 'localhost:9098',
'properties.group.id' = 'device.tumbling.w.sql',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
)
"""

tenv.execute_sql(src_ddl)
sensor_readings_tab = tenv.from_path('sensor_readings')

# Define a Tumbling Window Aggregate Calculation of ampere-hour sensor readings
# - For every 30 seconds non-overlapping window
# - Sum of charge consumed by each device
tumbling_w = sensor_readings_tab.window(Tumble.over(lit(30).seconds)
.on(sensor_readings_tab.proctime)
.alias('w')) \
.group_by(col('w'), sensor_readings_tab.device_id) \
.select(sensor_readings_tab.device_id,
col('w').start.alias('window_start'),
col('w').end.alias('window_end'),
sensor_readings_tab.ampere_hour.sum.alias('charge_consumed'))


sink_ddl = """
CREATE TABLE devicecharge (
device_id VARCHAR,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
charge_consumed DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'device.charge',
'properties.bootstrap.servers' = 'localhost:9098',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
)
"""

tenv.execute_sql(sink_ddl)
tumbling_w.execute_insert('devicecharge').wait()

if __name__ == '__main__':
main()

Time to test the Table API transformation !

Let’s create 2 Kafka topics : sensor.readings and device.charge as explained in the last article.

Run the sensor producer python file, just like the last article.

Run the Table API python file :

Let’s check the device.charge topic messages using a console consumer :

{"device_id":"00:0f:00:70:91:0a","window_start":"2024-03-26 19:20:00","window_end":"2024-03-26 19:20:30","charge_consumed":0.99}
{"device_id":"1c:bf:ce:15:ec:4d","window_start":"2024-03-26 19:20:00","window_end":"2024-03-26 19:20:30","charge_consumed":1.5699999999999998}
{"device_id":"b8:27:eb:bf:9d:51","window_start":"2024-03-26 19:20:00","window_end":"2024-03-26 19:20:30","charge_consumed":3.92}

{"device_id":"1c:bf:ce:15:ec:4d","window_start":"2024-03-26 19:20:30","window_end":"2024-03-26 19:21:00","charge_consumed":2.06}
{"device_id":"00:0f:00:70:91:0a","window_start":"2024-03-26 19:20:30","window_end":"2024-03-26 19:21:00","charge_consumed":1.6}
{"device_id":"b8:27:eb:bf:9d:51","window_start":"2024-03-26 19:20:30","window_end":"2024-03-26 19:21:00","charge_consumed":3.94}

Perfect, we get the Table API group by result every 30 seconds with a SUM aggregation on the ampere_hour column !

pyflink Table API also supports a Sliding window and a Session Window as well ! Both are available in the same Table API module :

from pyflink.table.window import Tumble, Slide, Session

For a sliding window computation, windows overlap and it carries 2 window parameters — window-duration and slide-duration. It is fairly simple to test a Sliding Window Table API transforms on the same codebase, I leave it as an exercise for you.

Sliding Window s come in very handy for computing aggregations per slide-duration. Like, a moving average for a stock etc …

Interestingly, a sliding window is called a HOP a.k.a Hopping Window in FlinkSQL.

Hope this gets you started on pyflink Table API, code is in the GitHub repo.

Stay tuned and happy coding !

--

--