Kafka-PyFlink Getting Started-Part 6 : Writing UDF s with pyflink Table API

Diptiman Raichaudhuri
4 min readApr 16, 2024

--

Disclosure: All opinions expressed in this article are my own, and represent no one but myself and not those of my current or any previous employers.

In the last article of the series, I demonstrated a pyflink Table API tumbling window operating on streaming data from a Kafka topic.

In this article, I build an example of writing a pyflink Table API UDF (User Defined Function).

This is a series for running pyflink and FlinkSQL transformations on Apache Kafka for streaming analytics use cases.

If you haven’t gone through the series so far, please read my previous articles :

  1. Kafka with KRaft on Windows
  2. Data Engineering with pyflink Table API
  3. Data Engineering with FlinkSQL
  4. FlinkSQL Tumbling Window aggregation
  5. Pyflink Table API Tumbling Window aggregation
  6. Pyflink Table API UDF — User Defined Functions

UDFs are needed to execute complex mathematical/statistical operations, which might be tedious to perform using SQL/DSL. These operations could be written using Python functions and can be added to the pyflink Table API.

Flink supports two types of UDFs :

  1. General Python UDF (processes data one row at a time)
  2. Vectorized Python UDF(executed by transferring a batch of elements between JVM and Python VM in Arrow columnar format)

Read here and here.

While, there are many ways to define an UDF, I chose to use the @udf decorator to define the UDF.

I use the same setup as the rest of the articles of the series.

I have a .csv file which recorded the charge consumed sensor reading for 3 sensors. These readings are recorded every 15 minutes, 4 times in an hour.

For each, device, the csv file has 4 records , in total the csv has 3 rows.

sensor history data

Job in hand is to create another csv file which will have 3 rows, but, the columns would be device_id , total_charge_consumed and avg_15 . So, we would end up with the total_charge(hourly) and an average of charge consumer every 15 minutes.

It is perfectly normal for sensors to have an hourly average, daily average etc … this experiment tries to achieve this with Flink UDFs.

To set the experiment up, I have create a folder sensor-history where I have kept the sensor_history.csv file. I have also created a device_stats folder, where a partitioned part file would be generated as part of the write-sink operation of pyflink Table API. All files are in the github repo.

Let’s break the code down for the pyflink processor :

Start with the customary dependenceis. math is required for summing the double values of the sensors :

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings, Schema, TableDescriptor
from pyflink.common import Row
from pyflink.table.udf import udf
import math

Now, the UDF first :

@udf(result_type=DataTypes.ROW([DataTypes.FIELD('device_id',DataTypes.STRING()),
DataTypes.FIELD('total_charge_consumed', DataTypes.DOUBLE()),
DataTypes.FIELD('avg_15', DataTypes.DOUBLE()),
]))
def device_stats_udf(device_history_readings: Row) -> Row:
device_id, min15, min30, min45, min60 = device_history_readings
quartile_readings = (min15, min30, min45, min60)
total_charge_consumed = math.fsum(quartile_readings)
avg_15 = total_charge_consumed / 4

return Row(device_id, total_charge_consumed, avg_15)

As explained earlier, I chose the @udf decorator and the general python UDF and not the vectorized one. With the vectorized one, you can use pandas, numpy as well, but, for this article, let’s keep it simple.

The UDF takes in a device_history_readings record as a pyflink.common.Row object and it modifies the record and returns another Row object. The device_history_readings record is unpacked into it’s constituent device_id and 4 records every 15 minutes for the device_id.

Then it creates 2 new variables total_charge_consumed by summing the 15 minute interval records and avg_15 which is the 15 minute average for each device reading. These 2 values alongwith the device_id is returned as the Row return type of the UDF, for every record in the input file.

That’s the simple working of this UDF. Please go through Flink’s UDF doc multiple times to understand the difference between UDF, UDAF, UDTF etc .. where scalar values of one or more rows could be mapped to zero, one or more output records as well ( here and here).

Let’s continue with the rest of the code. I created a main() method and started creating the regular pyflink application:

env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.in_batch_mode()
tbl_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)

Since, the Flink job would run on static files, I have selected in_batch_mode .

field_names = ['device_id', 'min15', 'min30', 'min45', 'min60']
field_types = [DataTypes.STRING(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE()]
schema = Schema.new_builder().from_fields(field_names, field_types).build()

source_path_tableapi = 'sensor-history'
tbl_env.create_table(
'hourly_readings',
TableDescriptor.for_connector('filesystem')
.schema(schema)
.option('path', f'{source_path_tableapi}')
.format('csv')
.build()
)
hourly_tab = tbl_env.from_path('hourly_readings')
print('\n Hourly Readings Schema ::>')
hourly_tab.print_schema()

print('\n Hourly Readings Data ::>')
hourly_tab.execute().print()

Then I create a hourly_readings table from the input device_history.csv file kept in sensor-readings folder. Then I create a hourly_tab reference to proceed further.

device_stats = hourly_tab.map(device_stats_udf).alias('device_id', 'total_charge_consumed', 'avg_15')

This is where the device_stats_udf function executes and creates the return Row with the average and summed up results.

sink_field_names = ['device_id', 'total_charge_consumed', 'avg_15']
sink_field_types = [DataTypes.STRING(), DataTypes.DOUBLE(), DataTypes.DOUBLE()]
sink_schema = Schema.new_builder().from_fields(sink_field_names, sink_field_types).build()

source_path_tableapi = 'device_stats'
tbl_env.create_table(
'udf_sink',
TableDescriptor.for_connector('filesystem')
.schema(sink_schema)
.option('path', f'{source_path_tableapi}')
.format('csv')
.build()
)

device_stats.execute_insert('udf_sink').print()

Then I created a sink schema on a newly created folder device_stats .

Afte that, the device_stats table, which is the output of the UDF, writes part files in this folder once I run the execute_insert .

UDF output

The created file captured the SUM operation and the 15 minute average in the output.

While this is just a starting point, UDFs, UDAFs are great means for adding scalar functions beyond SQL semantics to enrich/filter event streams.

Code is the repo to test the experiment out.

Stay tuned and happy coding !

--

--