Kafka-PyFlink Getting Started -Part 2-Data Engineering with Pyflink Table API

Diptiman Raichaudhuri
6 min readMar 12, 2024

--

Disclosure: All opinions expressed in this article are my own, and represent no one but myself and not those of my current or any previous employers.

This is the 2nd article in my Kafka-Flink series. If you haven’t read the earlier one, I explained from grounds-up how to set up a Kafka 3.7 single node cluster on Windows 10/11 laptop with IntelliJ community edition.

Here’s the link : Kafka with KRaft on Windows

Here’s the whole series :

  1. Kafka with KRaft on Windows
  2. Data Engineering with pyflink Table API
  3. Data Engineering with FlinkSQL
  4. FlinkSQL Tumbling Window aggregation
  5. Pyflink Table API Tumbling Window aggregation
  6. Pyflink Table API UDF — User Defined Functions

In this article, I want to start breaking down the steps of data processing / data transformations offered by Apache Flink. While Flink has been around for quite some time, the python wrappers (Table API) and FlinkSQL has been a fairly new addition to this powerful stream processing platform.

While. I encourage you to go through the documentation, here , in this article I would start with a simple pyflink application.

I chose Pycharm community edition as my development environment.

After installing pycharm CE, I create a new project, pycharm by default creates a virtualenv for a new project, so that, dependency isolation is maintained at the project level :

pycharm new porject

I already installed python 3.11 which remains my base interpreter.

After creating the project, I opened ‘terminal’ from the bottom leftnav menu and installed the following dependencies :

pip install -q apache-flink confluent-kafka kafka-python

flink dependencies

Hopefully the installation runs fine. Let’s also create a requierments.txt so that this project can be ported to other tools, if required.

On the terminal, fire the following command :

pip freeze > requirements.txt

This would generate a requirements.txt file in the project folder with all dependencies and their respective versions. Check that apache-flink==1.18.1 , which is the latest stable version.

That’s it, we have our development environment setup.

To run a pyflink Table API or FlinkSQL from python environments, you would need the following two things (env settings and a table env) :

from pyflink.table import EnvironmentSettings, TableEnvironment, TableDescriptor, Schema, DataTypes

settings = EnvironmentSettings.in_batch_mode()

tenv = TableEnvironment.create(settings)

An environment setting specifies the execution mode (whether batch or streaming) and a TableEnvironment is responsible for (as per the doc) :

“Registering a Table in the internal catalog

Registering catalogs

Loading pluggable modules

Executing SQL queries

Registering a user-defined (scalar, table, or aggregation) function

Converting between DataStream and Table (in case of StreamTableEnvironment)

A Table is always bound to a specific TableEnvironment. It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them. A TableEnvironment is created by calling the static TableEnvironment.create() method. “

In short, a table environment is the entry point for the application.

For this article, I try to process a static csv file, which I downloaded from Kaggle, here. It is a fairly large device data collection for environment sensor telemetry data. I have created a subset of this data, with 3000 records and kept it within a new folder sensor-source

It is important to note that, I had also removed the header of the file, considering my processing might encounter Hive style partitioned files, thus, I have also created a schema and enforced on the file. The file is attached in the github repo (link below).

I created the schema using the pyflink.table.Schema class :

field_names = ['ts', 'device', 'co', 'humidity', 'light', 'lpg', 'motion', 'smoke', 'temp']
field_types = [DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING()]
schema = Schema.new_builder().from_fields(field_names, field_types).build()

My file had all fields as strings, thus, the schema is nothing spectacular !

Before we create the table, let’s take another peek at the doc :

“A TableEnvironment maintains a map of catalogs of tables which are created with an identifier. Each identifier consists of 3 parts: catalog name, database name and object name. If a catalog or database is not specified, the current default value will be used (see examples in the Table identifier expanding section).

Tables can be either virtual (VIEWS) or regular (TABLES). VIEWS can be created from an existing Table object, usually the result of a Table API or SQL query. TABLES describe external data, such as a file, database table, or message queue.”

For this exercise, I’ll create a table using a filesystem connector, with a single file in the folder (which could have been a folder with Hive styled partitions as well).

Here’s how I do it :

source_path_tableapi = 'sensor-source'
tenv.create_table(
'device_data',
TableDescriptor.for_connector('filesystem')
.schema(schema)
.option('path', f'{source_path_tableapi}')
.format('csv')
.build()
)

device_tab =tenv.from_path('device_data')

This creates a table device_data and registers in the transient catalog, I could have also created a temporary table as mentioned by the doc, above.

Now, tenv.from_path() scans the registered devide_data table from the catalog and returns the reference in the device_tab handle.

Now, we can query the table :

print(device_tab.print_schema())
print(device_tab.to_pandas().head())

This responds with the schema that we built , and we could just run the first 5 rows, by converting the table to a Pandas dataframe :

select *

Let’s now run a pyflink Table API DSL query to select a few columns.

Let’s select distinct devices emitting data :

distinct_devices = device_tab.select(device_tab.device).distinct()
print(distinct_devices.to_pandas())

Simple, we selected the column and applied a distinct() method to select unique devices. Here’s the result :

select distinct

Let’s also test a where or a filter clause, by selecting only those records where temp >= 20

high_temp_devices = device_tab.select(device_tab.ts, device_tab.device, device_tab.temp) \
.where(device_tab.temp >= "20")
print(high_temp_devices.to_pandas())

And, here’s the result :

where filter

For the high_temp_devices query, let’s analyze the EXPLAIN PLAN by appending a .explain() method on the query handle :

print("Explain plan for high_temp_device query \n")
print(high_temp_devices.explain())

And, here’s the explain plan for the query :

explain plan

Flink SQL parser creates the Abstract Syntax Tree first, followed by generating the Optimized Physical Plan and the Optimized Execution Plan !

Perfect, we have been able to run a couple of pyflink Table API commands in batch mode and process a small file in a python development environment, and also analyzed the query with an explain plan.

In the next article, I’ll demonstrate the same batch processing using FlinkSQL.

The real benefit of Flink becomes apparent when I’ll demonstrate FlinkSQL and pyflink Table API transformations on streaming data using Kafka topics. We’ll reach there gradually, slowly building up from the basics.

Here’s the full code :

from pyflink.table import (EnvironmentSettings, TableEnvironment,
TableDescriptor, Schema, DataTypes)

#01_batch_csv_process.py
#=======================

def main():
settings = EnvironmentSettings.in_batch_mode()
tenv = TableEnvironment.create(settings)

field_names = ['ts', 'device', 'co', 'humidity',
'light', 'lpg', 'motion', 'smoke', 'temp']
field_types = [DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING()]
schema = Schema.new_builder().from_fields(field_names, field_types).build()

source_path_tableapi = 'sensor-source'
tenv.create_table(
'device_data',
TableDescriptor.for_connector('filesystem')
.schema(schema)
.option('path', f'{source_path_tableapi}')
.format('csv')
.build()
)

device_tab = tenv.from_path('device_data')
# print(device_tab.print_schema())
# print(device_tab.to_pandas().head())

distinct_devices = device_tab.select(device_tab.device).distinct()
# print(distinct_devices.to_pandas())

high_temp_devices = device_tab.select(device_tab.ts, device_tab.device, device_tab.temp) \
.where(device_tab.temp >= "20")

print(high_temp_devices.to_pandas())
print('\n')
print("Explain plan for high_temp_device query \n")
print(high_temp_devices.explain())


if __name__ == '__main__':
main()

Here’s the repo : kafka_flink_getting_started

Stay tuned and happy coding !

--

--