Kafka-PyFlink Getting Started-Part 5-Pyflink Table API Tumbling Window Aggregation
Disclosure: All opinions expressed in this article are my own, and represent no one but myself and not those of my current or any previous employers.
In the last article of the series, I demonstrated a FlinkSQL tumbling window operating on streaming data from a Kafka topic.
Pyflink Table API is another approach of writing the same aggregation using DSL.
In this article, I perform the same tumbling window groupby, but, this time, with pyflink Table API.
Please note that both the Table API as well as FlinkSQL produces the same explain plan for a query.
If you haven’t gone through the series so far, here’s all the parts :
- Kafka with KRaft on Windows
- Data Engineering with pyflink Table API
- Data Engineering with FlinkSQL
- FlinkSQL Tumbling Window aggregation
- Pyflink Table API Tumbling Window aggregation
- Pyflink Table API UDF — User Defined Functions
Let’s take the same example as the last article and fire up a producer which produces sensor readings and streams these events to a Kafka topic.
The goal remains the same as my last article, where
- Create 2 kafka topics :
sensor.readings
anddevice.charge
- I would run a simulated sensor device farm with 3 devices emitting events to the
sensor.readings
kafka topic. - I would read sensor readings from the kafka topic using a pyflink dynamic table.
- Use pyflink Table API to SUM the
ampere-hour
readings emitted by 3 sensors over a non-overlapping window ( a.k.atumbling window
) of 30 seconds - Ingest the aggregated stream in another kafka topic (pyflink sink !).
- Investigate that the tumbling window group by works fine by testing the new topic with a console consumer.
Except the pyflink Table API, all other steps remain the same like the last article, so, let’s break the Table API transform down :
First, the customary imports and creating the TableEnvironment
variable :
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumble
# 06_kafka_pyflink_tableapi_tumbling_window.py
def main():
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.in_streaming_mode()
tenv = StreamTableEnvironment.create(env, settings)
Then adding the flink-sql-connector-kafka jar, from here, to the env
.
env.add_jars("file:///D:\\testing_space\\PycharmProjects\\kafka-flink-getting-started\\flink-sql-connector-kafka-3.1.0-1.18.jar")
Followed by creation of the sensor_readings
table using the sensor.readings
Kafka topic and executing the source DDL :
src_ddl = """
CREATE TABLE sensor_readings (
device_id VARCHAR,
co DOUBLE,
humidity DOUBLE,
motion BOOLEAN,
temp DOUBLE,
ampere_hour DOUBLE,
ts BIGINT,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'sensor.readings',
'properties.bootstrap.servers' = 'localhost:9098',
'properties.group.id' = 'device.tumbling.w.sql',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
)
"""
tenv.execute_sql(src_ddl)
sensor_readings_tab = tenv.from_path('sensor_readings')
Now, we define the same 30 seconds tumbling window SUM aggregation of the ampere_hour
column grouped by devices :
# Define a Tumbling Window Aggregate Calculation of ampere-hour sensor readings
# - For every 30 seconds non-overlapping window
# - Sum of charge consumed by each device
tumbling_w = sensor_readings_tab.window(Tumble.over(lit(30).seconds)
.on(sensor_readings_tab.proctime)
.alias('w')) \
.group_by(col('w'), sensor_readings_tab.device_id) \
.select(sensor_readings_tab.device_id,
col('w').start.alias('window_start'),
col('w').end.alias('window_end'),
sensor_readings_tab.ampere_hour.sum.alias('charge_consumed'))
The DSL works very intuitively ! A tumbling window computation is performed on sensor_readings.proctime
column, where the SUM() of ampere_hour
is performed, grouped by devices.
This computation is performed, every 30 seconds, so the pyflink job runs indefinitely and computes this SUM() on an unbounded context !
Then we create a sink table to hold the aggregated data, on top of the other kafka topic created and execute the insert query :
sink_ddl = """
CREATE TABLE devicecharge (
device_id VARCHAR,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
charge_consumed DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'device.charge',
'properties.bootstrap.servers' = 'localhost:9098',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
)
"""
tenv.execute_sql(sink_ddl)
tumbling_w.execute_insert('devicecharge').wait()
if __name__ == '__main__':
main()
Here’s the full Table API code :
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumble
def main():
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.in_streaming_mode()
tenv = StreamTableEnvironment.create(env, settings)
env.add_jars("file:///D:\\testing_space\\PycharmProjects\\kafka-flink-getting-started\\flink-sql-connector-kafka-3.1.0-1.18.jar")
src_ddl = """
CREATE TABLE sensor_readings (
device_id VARCHAR,
co DOUBLE,
humidity DOUBLE,
motion BOOLEAN,
temp DOUBLE,
ampere_hour DOUBLE,
ts BIGINT,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'sensor.readings',
'properties.bootstrap.servers' = 'localhost:9098',
'properties.group.id' = 'device.tumbling.w.sql',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
)
"""
tenv.execute_sql(src_ddl)
sensor_readings_tab = tenv.from_path('sensor_readings')
# Define a Tumbling Window Aggregate Calculation of ampere-hour sensor readings
# - For every 30 seconds non-overlapping window
# - Sum of charge consumed by each device
tumbling_w = sensor_readings_tab.window(Tumble.over(lit(30).seconds)
.on(sensor_readings_tab.proctime)
.alias('w')) \
.group_by(col('w'), sensor_readings_tab.device_id) \
.select(sensor_readings_tab.device_id,
col('w').start.alias('window_start'),
col('w').end.alias('window_end'),
sensor_readings_tab.ampere_hour.sum.alias('charge_consumed'))
sink_ddl = """
CREATE TABLE devicecharge (
device_id VARCHAR,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
charge_consumed DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'device.charge',
'properties.bootstrap.servers' = 'localhost:9098',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json'
)
"""
tenv.execute_sql(sink_ddl)
tumbling_w.execute_insert('devicecharge').wait()
if __name__ == '__main__':
main()
Time to test the Table API transformation !
Let’s create 2 Kafka topics : sensor.readings
and device.charge
as explained in the last article.
Run the sensor producer python file, just like the last article.
Run the Table API python file :
Let’s check the device.charge
topic messages using a console consumer :
{"device_id":"00:0f:00:70:91:0a","window_start":"2024-03-26 19:20:00","window_end":"2024-03-26 19:20:30","charge_consumed":0.99}
{"device_id":"1c:bf:ce:15:ec:4d","window_start":"2024-03-26 19:20:00","window_end":"2024-03-26 19:20:30","charge_consumed":1.5699999999999998}
{"device_id":"b8:27:eb:bf:9d:51","window_start":"2024-03-26 19:20:00","window_end":"2024-03-26 19:20:30","charge_consumed":3.92}
{"device_id":"1c:bf:ce:15:ec:4d","window_start":"2024-03-26 19:20:30","window_end":"2024-03-26 19:21:00","charge_consumed":2.06}
{"device_id":"00:0f:00:70:91:0a","window_start":"2024-03-26 19:20:30","window_end":"2024-03-26 19:21:00","charge_consumed":1.6}
{"device_id":"b8:27:eb:bf:9d:51","window_start":"2024-03-26 19:20:30","window_end":"2024-03-26 19:21:00","charge_consumed":3.94}
Perfect, we get the Table API group by result every 30 seconds with a SUM aggregation on the ampere_hour
column !
pyflink Table API also supports a Sliding window
and a Session Window
as well ! Both are available in the same Table API module :
from pyflink.table.window import Tumble, Slide, Session
For a sliding window computation, windows overlap and it carries 2 window parameters — window-duration and slide-duration. It is fairly simple to test a Sliding Window
Table API transforms on the same codebase, I leave it as an exercise for you.
Sliding Window
s come in very handy for computing aggregations per slide-duration
. Like, a moving average for a stock etc …
Interestingly, a sliding window
is called a HOP
a.k.a Hopping Window
in FlinkSQL.
Hope this gets you started on pyflink Table API, code is in the GitHub repo.
Stay tuned and happy coding !