Kafka-PyFlink Getting Started-Part 6 : Writing UDF s with pyflink Table API
Disclosure: All opinions expressed in this article are my own, and represent no one but myself and not those of my current or any previous employers.
In the last article of the series, I demonstrated a pyflink Table API tumbling window operating on streaming data from a Kafka topic.
In this article, I build an example of writing a pyflink Table API UDF (User Defined Function).
This is a series for running pyflink and FlinkSQL transformations on Apache Kafka for streaming analytics use cases.
If you haven’t gone through the series so far, please read my previous articles :
- Kafka with KRaft on Windows
- Data Engineering with pyflink Table API
- Data Engineering with FlinkSQL
- FlinkSQL Tumbling Window aggregation
- Pyflink Table API Tumbling Window aggregation
- Pyflink Table API UDF — User Defined Functions
UDFs are needed to execute complex mathematical/statistical operations, which might be tedious to perform using SQL/DSL. These operations could be written using Python functions and can be added to the pyflink Table API.
Flink supports two types of UDFs :
- General Python UDF (processes data one row at a time)
- Vectorized Python UDF(executed by transferring a batch of elements between JVM and Python VM in Arrow columnar format)
While, there are many ways to define an UDF, I chose to use the @udf
decorator to define the UDF.
I use the same setup as the rest of the articles of the series.
I have a .csv
file which recorded the charge consumed
sensor reading for 3 sensors. These readings are recorded every 15 minutes, 4 times in an hour.
For each, device, the csv file has 4 records , in total the csv has 3 rows.
Job in hand is to create another csv file which will have 3 rows, but, the columns would be device_id
, total_charge_consumed
and avg_15
. So, we would end up with the total_charge(hourly) and an average of charge consumer every 15 minutes.
It is perfectly normal for sensors to have an hourly average, daily average etc … this experiment tries to achieve this with Flink UDFs.
To set the experiment up, I have create a folder sensor-history
where I have kept the sensor_history.csv
file. I have also created a device_stats
folder, where a partitioned part
file would be generated as part of the write-sink operation of pyflink Table API. All files are in the github repo.
Let’s break the code down for the pyflink processor :
Start with the customary dependenceis. math
is required for summing the double values of the sensors :
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings, Schema, TableDescriptor
from pyflink.common import Row
from pyflink.table.udf import udf
import math
Now, the UDF first :
@udf(result_type=DataTypes.ROW([DataTypes.FIELD('device_id',DataTypes.STRING()),
DataTypes.FIELD('total_charge_consumed', DataTypes.DOUBLE()),
DataTypes.FIELD('avg_15', DataTypes.DOUBLE()),
]))
def device_stats_udf(device_history_readings: Row) -> Row:
device_id, min15, min30, min45, min60 = device_history_readings
quartile_readings = (min15, min30, min45, min60)
total_charge_consumed = math.fsum(quartile_readings)
avg_15 = total_charge_consumed / 4
return Row(device_id, total_charge_consumed, avg_15)
As explained earlier, I chose the @udf
decorator and the general python UDF and not the vectorized one. With the vectorized one, you can use pandas, numpy as well, but, for this article, let’s keep it simple.
The UDF takes in a device_history_readings
record as a pyflink.common.Row
object and it modifies the record and returns another Row object. The device_history_readings
record is unpacked into it’s constituent device_id and 4 records every 15 minutes for the device_id.
Then it creates 2 new variables total_charge_consumed
by summing the 15 minute interval records and avg_15
which is the 15 minute average for each device reading. These 2 values alongwith the device_id is returned as the Row return type of the UDF, for every record in the input file.
That’s the simple working of this UDF. Please go through Flink’s UDF doc multiple times to understand the difference between UDF, UDAF, UDTF etc .. where scalar values of one or more rows could be mapped to zero, one or more output records as well ( here and here).
Let’s continue with the rest of the code. I created a main()
method and started creating the regular pyflink application:
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.in_batch_mode()
tbl_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
Since, the Flink job would run on static files, I have selected in_batch_mode
.
field_names = ['device_id', 'min15', 'min30', 'min45', 'min60']
field_types = [DataTypes.STRING(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE()]
schema = Schema.new_builder().from_fields(field_names, field_types).build()
source_path_tableapi = 'sensor-history'
tbl_env.create_table(
'hourly_readings',
TableDescriptor.for_connector('filesystem')
.schema(schema)
.option('path', f'{source_path_tableapi}')
.format('csv')
.build()
)
hourly_tab = tbl_env.from_path('hourly_readings')
print('\n Hourly Readings Schema ::>')
hourly_tab.print_schema()
print('\n Hourly Readings Data ::>')
hourly_tab.execute().print()
Then I create a hourly_readings
table from the input device_history.csv
file kept in sensor-readings
folder. Then I create a hourly_tab
reference to proceed further.
device_stats = hourly_tab.map(device_stats_udf).alias('device_id', 'total_charge_consumed', 'avg_15')
This is where the device_stats_udf
function executes and creates the return Row with the average and summed up results.
sink_field_names = ['device_id', 'total_charge_consumed', 'avg_15']
sink_field_types = [DataTypes.STRING(), DataTypes.DOUBLE(), DataTypes.DOUBLE()]
sink_schema = Schema.new_builder().from_fields(sink_field_names, sink_field_types).build()
source_path_tableapi = 'device_stats'
tbl_env.create_table(
'udf_sink',
TableDescriptor.for_connector('filesystem')
.schema(sink_schema)
.option('path', f'{source_path_tableapi}')
.format('csv')
.build()
)
device_stats.execute_insert('udf_sink').print()
Then I created a sink schema on a newly created folder device_stats
.
Afte that, the device_stats
table, which is the output of the UDF, writes part files in this folder once I run the execute_insert
.
The created file captured the SUM operation and the 15 minute average in the output.
While this is just a starting point, UDFs, UDAFs are great means for adding scalar functions beyond SQL semantics to enrich/filter event streams.
Code is the repo to test the experiment out.
Stay tuned and happy coding !