Kafka-PyFlink Getting Started-Part 3-Data Engineering with FlinkSQL and FlinkSQL EXPLAIN PLAN

Diptiman Raichaudhuri
5 min readMar 13, 2024

--

Disclosure: All opinions expressed in this article are my own, and represent no one but myself and not those of my current or any previous employers.

This is the 3rd article in my Kafka-Flink series. If you haven’t read the earlier ones, here are the links :

  1. Kafka with KRaft on Windows
  2. Data Engineering with pyflink Table API

Here’s the whole series :

  1. Kafka with KRaft on Windows
  2. Data Engineering with pyflink Table API
  3. Data Engineering with FlinkSQL
  4. FlinkSQL Tumbling Window aggregation
  5. Pyflink Table API Tumbling Window aggregation
  6. Pyflink Table API UDF — User Defined Functions

In this article I continue on the same example in Part 2 and write data processing on the same batch file using FlinkSQL with pyflink.

For developers comfortable in writing SQL, FlinkSQL remains the most potent weapon. FlinkSQL is based on Apache Calcite (the best SQL parser and validator, ever !!! .. whaddyasay !) and remains ANSI-SQL 2011 compliant.

While FlinkSQL can connect to a long list of sources and sinks, we’ll gradually navigate the complexity, starting with writing FlinkSQL queries for the same sensor telemetry dataset.

FlinkSQL runs queries on top of tables. Both Table API and FlinkSQL have been built from grounds up so that these can seamlessly integrate with each other, as well as integrate with the lower level DataStream API as well.

FlinkSQL processes data with source tables. Sources could be Kafka topics, databases, filesystem or any other Flink complaint source. FlinkSQL queries execute over the rows which are produced by these sources.

For this exercise, we’ll pick up the same sensor telemetry source data and run basic FlinkSQL queries and analyze an explain plan.

Comparing the explain plan for FlinkSQL and the same query executed as a Table API in my previous article will confirm that FlinkSQL and Table API are seamlessly integrated within pyflink.

To setup your dev environment on pycharm community edition, please read previous Part 2 of my series.

Similar to Flink Table API, I start with setting the TableEnvironment, which is pretty much the entry point of my application :

settings = EnvironmentSettings.in_batch_mode()
tenv = TableEnvironment.create(settings)

Then I create the table using FlinkSQL :

 # Using FlinkSQL
# ================
sensor_source_ddl = """
CREATE TABLE device_data(
ts VARCHAR,
device VARCHAR,
co VARCHAR,
humidity VARCHAR,
light VARCHAR,
lpg VARCHAR,
motion VARCHAR,
smoke VARCHAR,
temp VARCHAR
) WITH (
'connector' = 'filesystem',
'path' = 'sensor-source',
'format' = 'csv'
)
"""

Look how the schema is mapped using the regular DDL statements and the source is mapped using the WITH statement. For this exercise, I have used the filesystem connector, later in the series I’ll demonstrate using the kafka connector.

Then I run the execute_sql() method of TableEnvironment to run the DDL and create the table in Flink catalog. Once, the table is created, I use the from_path() method of TableEnvironment to obtain a table handle, note that the DDL created a table named device_data ,j ust like the Table API steps :

tenv.execute_sql(sensor_source_ddl)
device_tab = tenv.from_path('device_data')

device_tab is the table which I’ll use to run FlinkSQL queries.

Let’s start with a select * for 10 rows :

tenv.sql_query('SELECT * FROM device_data LIMIT 10').execute().print()

Remember, Flink queries are lazily evaluated, so, you’ll have to call execute() to run the query and print() to print it on console.

select *

Just like the Table API, FlinkSQL ran the select * with a LIMIT 10 and printed 10 rows.

Similarly, let’s rewrite the distinct devices query using FlinkSQL :

unique_devices_sql = """
SELECT DISTINCT device
FROM device_data
"""
print('Unique Devices : \n')
print(tenv.sql_query(unique_devices_sql).execute().print())

Here’s the result :

DISTINCT devices

No difference from Table API since, both are seamlessly integrated.

Let’s finish off with the high_temp_devices with a WHERE clause and also run the same EXPLAIN PLAN as the Table API example.

high_temp_devices_sql = """
SELECT ts,device,temp
FROM device_data
WHERE temp >= '20'
ORDER BY temp DESC
LIMIT 20
"""
print('High Temp Devices : \n')
print(tenv.sql_query(high_temp_devices_sql).execute().print())
WHERE clause

The only difference for EXPLAIN PLAN on FlinkSQL — we need to call the explain_sql() method of TableEnvironment and print the results.

EXPLAIN PLAN

Absolutely, the same explain plan as the Table API, Abstract Syntax Tree first, followed by generating the Optimized Physical Plan and the Optimized Execution Plan !

Here’s the full code :

from pyflink.table import (EnvironmentSettings, TableEnvironment,
TableDescriptor, Schema, DataTypes)

#02_batch_csv_flinksql.py
#=======================
def main():
settings = EnvironmentSettings.in_batch_mode()
tenv = TableEnvironment.create(settings)

# Using FlinkSQL
# ================
sensor_source_ddl = """
CREATE TABLE device_data(
ts VARCHAR,
device VARCHAR,
co VARCHAR,
humidity VARCHAR,
light VARCHAR,
lpg VARCHAR,
motion VARCHAR,
smoke VARCHAR,
temp VARCHAR
) WITH (
'connector' = 'filesystem',
'path' = 'sensor-source',
'format' = 'csv'
)
"""
tenv.execute_sql(sensor_source_ddl)
device_tab = tenv.from_path('device_data')
print('\n Device Table Schema - using FinkSQL')
device_tab.print_schema()
print('\nSensor Data - using FlinkSQL')
tenv.sql_query('SELECT * FROM device_data LIMIT 10').execute().print()
print('\n')

unique_devices_sql = """
SELECT DISTINCT device
FROM device_data
"""
print('Unique Devices : \n')
print(tenv.sql_query(unique_devices_sql).execute().print())

high_temp_devices_sql = """
SELECT ts,device,temp
FROM device_data
WHERE temp >= '20'
ORDER BY temp DESC
LIMIT 20
"""
print('High Temp Devices : \n')
print(tenv.sql_query(high_temp_devices_sql).execute().print())

print("Explain plan for high_temp_device query - FlinkSQL\n")
high_temp_device_sql_explanation = tenv.explain_sql(high_temp_devices_sql)
print(high_temp_device_sql_explanation)

if __name__ == '__main__':
main()

In this exercise, I took the same DSL of Table API and applied those queries using FlinkSQL and the EXPLAIN PLAN demonstrated that both executions are seamlessly integrated.

In subsequent articles in this series, we’ll move into advanced FlinkSQL concepts using WINDOW functions and map with Kafka connectors.

Here’s the kafka_pyflink_getting_started

Stay tuned and happy coding !

--

--