Build a Real-Time Kibana Dashboard With Field Sensor Data Using Confluent Cloud, Kafka, Flink SQL, and Elastic Cloud
Disclosure: All opinions expressed in this article are my own, and represent no one but myself and not those of my current or any previous employers.
After finishing the pyflink-kafka getting started series, I wanted to create an end-to-end example, by taking the same business problem of field sensors emitting device reading events to Apache Kafka®, using Flink SQL to transform the records and create a new table which gets updated with the charge consumed by each device every 30 seconds (tumbling window), and then displays a real-time dashboard of the charge consumed.
For real-world production scenarios, Kafka and Apache Flink® needs to be deployed on managed platforms, otherwise it becomes difficult to manage infrastructure intricacies (e.g., fault tolerance, high availability, auto-scaling).
For this end-to-end example, I will run a sensor simulator for three devices, create topics on Confluent Cloud for storage of events and write Flink SQL on Confluent Cloud to transform device events and calculate the SUM of charge consumed every 30 minutes, which will be stored in a topic generated automatically by Confluent.
After that, I would run a managed ElasticSearch sink connector on Confluent Cloud and push these derived readings to an Elastic Cloud index and build a realtime Kibana dashboard which will auto-refresh
every five seconds.
Here’s a broad diagram of the data flow:
Since the entire data pipeline will run on cloud, it needs to be ensured that the Kafka topic, the Flink compute pool and the Elastic Cloud resources (Index + Kibana dashboard) are all within the same region of the cloud provider (AWS/Microsoft Azure/Google Cloud).
Let’s break down the steps to complete this experiment:
- Create a Confluent Cloud account with the cloud provider of choice and the region.
- Create and configure a Confluent Cloud Environment and a Kafka Cluster.
- Create and configure a Flink compute pool for running a set of managed Flink SQL statements through the Flink SQL workspace.
- Create a Elastic Cloud deployment with the same cloud provider and in the same region.
- Create a standalone python AVRO sensor producer, which runs on a local PC and simulates devices emitting events to the Kafka topic.
- Run Flink SQL statements for transforming the data.
- Create and configure an ElasticSearch sink connector on Confluent Cloud to connect to the Elastic Cloud
- Verify that the Elastic Cloud index is receiving messages.
- Create the Kibana real time dashboard.
- Terminate all resources.
Let’s get going.
Step 1: Create a Confluent Cloud account
While it is quick and easy to setup a Confluent Cloud account (you need to signup with an email, and you get $400 of free usage for the first 30 days), I recommend you go through steps mentioned in the Confluent Developer Portal. A Basic cluster is good enough for this experiment. Also, I chose AWS, us-east-2(Ohio) as the region where Confluent cloud would provision its resources.
Step 2: Create and configure a Kafka cluster and environment
Once, the Confluent account is all setup (you have to verify emails etc ..), it starts with a default Environment
. An environment logically isolates set of resources for a workload. For this experiment, let us create a new environment, which I named aws-conf-drc
. The Essentials
stream governance package will be good enough to run this experiment.
Once, your environment gets created, check the bottom of the page on the right hand side and you would see that a Stream Governance API
section will be created. Make a note of the Schema Registry endpoint, the Schema Registry ID and create a Schema Registry API Key. The schema registry key
and the secret
should be downloaded and keep the file in a secure place.
Each environment comes with a Stream Governance API (which comprises of a Schema Registry and Stream Catalog API. Schema registry is important since, the device readings would conform to a specific AVRO schema that I’ll create a little later in the section. Confluent cloud schema registry is a managed stream catalog resource which will be used for serializing and de serializing AVRO messages.
With that, the Confluent Cloud environment creation is complete.
Let’s create a Kafka cluster on Confluent Cloud by clicking on the +Add Cluster
button. Select Basic Configuration
, which would be good enough for this experiment and select AWS
and us-east-2(Ohio) Single Region
as the cloud deployment region.
Once, the cluster is launched, note down the cluster id
and the bootstrap server endpoint
from the cluster settings page.
Next, create a cluster API key from the left hand navigation menu. Click on API Keys
on the left-nav menu and then click +Add Key
. Download the cluster API key file and note down the key and the secret.
let’s create a topic sensor_readings
on the brand new cluster, with all default settings, by clicking on the Topics
link on the left navigation menu.
Step 3: Create and configure a Flink compute pool
With the Kafka cluster created and configured in less than 2 minutes(no pun intended ! !), let’s setup the managed Flink SQL environment to run the streaming transformation (or, streaming ETL).
Click the Environments
breadcrumb link on top and select the environment which was created. Click Flink new
and click +Add Compute Pool
.
Ensure that the cloud region remains the same (AWS us-east-2) for the Flink compute pool.
In the next page, leave everything for default, and the compute pool will get created with a MAX 10 CFU(Confluent Cloud for Apache Flink Unit), which is the maximum that the pool can auto scale to. Fully managed Flink it is ! !
With that the Flink compute pool is ready to run Flink SQL statements.
Step 4: Create a ElasticCloud deployment in the same region
This experiment intends to transport transformed messages from a Kafka topic to an Elasticsearch index using managed Elasticsearch sink connector on Confluent Cloud.
So, let’s create a Elastic Cloud deployment in the same region (AWS, us-east-2).
Click the Free Tier
Elastic Cloud link. This provides a 14 days, no credit card free trial, which is enough to test the pipeline, if it works fine.
Registration is quick, and ensure you select AWS,us-east-2(Ohio) as the region for deployment. I named my deployment drcdeploy1
. Paste the deployment URL on your browser and verify that the deployment is showing up alright, with the right region.
Click on security link on the left nav menu and generate a new password for the elastic
user. Also, make a note of the Elasticsearch endpoint.
That’s it, deployment for the real time dashboard is created. Would revisit this deployment once the pipeline starts shipping messages from Kafka topic to Elastic Cloud.
Step 5: Create a python AVRO Kafka producer
Let’s now get down to the business of wiring up the sensor simulator to create device events. For this article, Pycharm Community Edition is suesd to write the python producer. All code is in the Github repo.
Since, this is a PROD like scenario, let us setup the AVRO schema first for the device. Let’s name this file reading.avsc
and save it in a folder avro
. Later in the article, we’ll check within Schema Registry, how this schema is used. All code files are in the github repo.
{
"name": "Reading",
"type": "record",
"fields": [
{
"name": "device_id",
"type": "string"
},
{
"name": "co",
"type": "long"
},
{
"name": "humidity",
"type": "long"
},
{
"name": "motion",
"type": "boolean"
},
{
"name": "temp",
"type": "long"
},
{
"name": "ampere_hour",
"type": "long"
},
{
"name": "device_ts",
"type": "long"
}
]
}
Let’s also create a configuration file cc_config.py
. We will now use the Kafka Cluster and schema Registry credentials, that was downloaded earlier.
cc_config = {
'bootstrap.servers': '<BOOTSTRAP_SERVER_ENDPOINT>.us-east-2.aws.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '<CLUSTER_API_KEY>',
'sasl.password': '<CLUSTER_API_SECRET>'
}
sr_config = {
'url': 'https://<SCHEMA_REGISTRY_ENDPOINT>.us-east-2.aws.confluent.cloud',
'basic.auth.user.info': '<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>'
}
Be careful about the value for the Schema Registry key basic.auth.user.info
, it is of the format '<API_KEY>:<API_SECRET>'
. Also, notice the Kafka security.protocol
and the sasl.mechanisms
properties. Bootstrap server endpoints are by default secure for data-in-transit. The sasl.username
and the sasl.password
values are the cluster API Key and API Secret, respectively.
Now, that our connection configuration is done, let’s break down the sensor simulator producer code. I have taken the template from Confluent Python repo.
In Pycharm CE, let’s create a python file cc_sensor_producer_avro.py
:
import random
import time
import cc_config
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
Important imports are the cc_config
(for connection configuration parameters), SchemaRegsitryClient
and AvrSerializer
. While, I intend to dive deep in a later article on Schema Registry, for this experiment it is enough to know that the Schema Registry would guarantee the type contract between messages produced by the producer and messages consumed by the consumer. For this experiment, a message is sent with a key of type string and a value of type AVRO record to Kafka.
Next, let’s create a placeholder class for the sensor device readings, in the same file:
class Reading(object):
"""
Sensor Reading Record
Args:
device_id (str): Sensor unique device_id
co (long): Carbon Monoxide volume
humidity (long): Humidity percentage
motion(boolean): Whether the device has moved from its original position
temp(long): Temperature of the device
ampere_hour(long): charge consumed by the device
device_ts(long): event timestamp of the device when an event is emitted
"""
def __init__(self, device_id, co, humidity, motion, temp, ampere_hour, device_ts):
self.device_id = device_id
self.co = co
self.humidity = humidity
self.motion = motion
self.temp = temp
self.ampere_hour = ampere_hour
self.device_ts = device_ts
Let’s create a device reading to dict utility method, which will serialize an object of Reading class to a python dict
so that it is in a wire transferable format :
def reading_to_dict(reading, ctx):
"""
Returns a dict representation of a Reading instance for serialization.
Args:
reading (Reading): Reading instance.
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
Returns:
dict: Dict populated with sensor device reading attributes to be serialized.
"""
# User._address must not be serialized; omit from dict
return dict(device_id=reading.device_id,
co=reading.co,
humidity=reading.humidity,
motion=reading.motion,
temp=reading.temp,
ampere_hour=reading.ampere_hour,
device_ts=reading.device_ts)
Let’s also create a callback method for the producer :
def delivery_report(err, msg):
"""
Reports the failure or success of a message delivery.
Args:
err (KafkaError): The error that occurred on None on success.
msg (Message): The message that was produced or failed.
Note:
In the delivery report callback the Message.key() and Message.value()
will be the binary format as encoded by any configured Serializers and
not the same object that was passed to produce().
If you wish to pass the original object(s) for key and value to delivery
report callback we recommend a bound callback or lambda where you pass
the objects along.
in this case, msg.key() will return the sensor device id, since, that is set
as the key in the message.
"""
if err is not None:
print("Delivery failed for Device event {}: {}".format(msg.key(), err))
return
print('Device event {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
Let’s create the main()
method :
def main():
topic = 'sensor_readings'
schema = 'reading.avsc'
with open(f"avro/{schema}") as f:
schema_str = f.read()
schema_registry_conf = cc_config.sr_config
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_serializer = AvroSerializer(schema_registry_client,
schema_str,
reading_to_dict)
producer_conf = cc_config.cc_config
producer = Producer(producer_conf)
print("Producing device events to topic {}. ^C to exit.".format(topic))
while True:
devices = ['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']
device_id = random.choice(devices)
co = round(random.uniform(0.0011, 0.0072), 4)
humidity = round(random.uniform(45.00, 78.00), 2)
motion = random.choice([True, False])
temp = round(random.uniform(17.00, 36.00), 2)
ampere_hour = round(random.uniform(0.10, 1.80), 2)
device_ts = int(time.time() * 1000)
try:
reading = Reading(device_id=device_id,
co=co,
humidity=humidity,
motion=motion,
temp=temp,
ampere_hour=ampere_hour,
device_ts=device_ts,)
producer.produce(topic=topic,
key=device_id,
value=avro_serializer(reading, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report)
time.sleep(5)
except KeyboardInterrupt:
break
except ValueError:
print("Invalid input, discarding record...")
continue
finally:
producer.flush()
Let’s carefully understand what’s happening, line by line.
We need to create a sensor_readings
topic in the Confluent Cloud. The AVRO schema is assigned the variable schema, which reads the reading.avsc
AVRO schema specification from avro
folder and reads the schema content and stores it in schema_str
variable.
Then the program uses the sr_config
dictionary from the cc_config.py
module(which is already imported) and creates a schema_registry_client
.
This schema registry client reference, the schema_str
variable and the dictionary of the reading object is passed to the AvroSerializer
constructor to create an avro_serializer
reference.
Similarly, the producer configuration is taken from the cc_config
module and a producer reference is created.
Then, within an infinite while
loop, I create a device Reading object and pass it to the avro_serializer
. Note, that I had only created an AVRO serializer for the message value, for message key I continue to use the device_id
which is a string. In the producer.produce()
signature I also pass the topic
variable and the delivery_report
callback reference.
That’s it, the sensor aggregator listening to 3 sensor devices and creating messages for each sensor at a 5 seconds interval is ready to run !
Step 6: Run Flink SQL on managed Flink Compute Pool
If you run the producer developed in the last step, it would start producing device messages to the Kafka topic sensor_readings
. The python producer would print the delivery report in Pycharm CE console:
/opt/homebrew/opt/python@3.10/libexec/bin/python /Users/diptimanraichaudhuri/testing_space/pycharm_workspace/confluent-kafka-flink/cc_sensor_producer_avro.py
Producing device events to topic sensor_readings. ^C to exit.
Device event b'b8:27:eb:bf:9d:51' successfully produced to sensor_readings [4] at offset 0
Device event b'b8:27:eb:bf:9d:51' successfully produced to sensor_readings [4] at offset 1
Device event b'b8:27:eb:bf:9d:51' successfully produced to sensor_readings [4] at offset 2
Device event b'00:0f:00:70:91:0a' successfully produced to sensor_readings [3] at offset 0
Login to the Confluent Cloud console, click the cluster link, followed by the topic, and you should see this :
Notice how the key is retained as the device_id
. Now, click on the Schema
link and notice how the value has the schema that was developed :
Now, let’s click on the environment link on the breadcrumb and open the Flink compute pool console. Once opened click on Open SQL Workspace
:
The Flink SQL Workspace opens with a Flink SQL editor and Flink catalogs in the left nav menu. You should be able to see the topic that was created ( sensor_readings
) appearing on the left nav menu.
Flink conveniently takes the environment as the root catalog, any clusters created as a database within this root catalog and topics created as tables where Flink SQL queries could run. This essentially reduces the whole Flink stream processing operation as Flink SQL statements, which can run in a managed way, fully automated infrastructure (auto-scaling to 10 CFUs, as explained earlier etc ..).
For this, experiment, I will create a Flink dynamic table within this SQL workspace named device_charge
where I would continually select the SUM
aggregation of ampere_hour
every 30 seconds using a TUMBLING WINDOW
Windowing Table-Valued Functions (Windowing TVFs), and GROUP BY
device_id
. Let’s write the Flink SQL statement :
create table `aws-conf-drc`.`flinksql-test`.`device_charge`(
device_id STRING,
charge_consumed DOUBLE
) WITH (
'changelog.mode' = 'retract'
);
Notice, the important 'changelog.mode' = 'retract'
property, which essentially makes the table ready for update, instead of being an APPEND ONLY
table. In case you want to go deeper, read the Confluent docs, to understand more about Flink Dynamic Tables and the difference between append only and updating tables. Interestingly, The CREATE TABLE statement always creates a backing Kafka topic as well as the corresponding schema subjects for key and value, and, trying to create a table with a name that exists in the catalog causes an exception.
Also check that the Cluster topics page should now show two topics, the new one being device_charge
.
Flink created a backing topic for the new table device_charge
with a generated AVRO schema for the value fields.
Now, let’s run the meat of the query (!!), which is writing the Flink SQL aggregation and inserting the results into this new device_charge
table:
INSERT INTO `aws-conf-drc`.`flinksql-test`.`device_charge`
SELECT device_id,SUM(ampere_hour) AS charge_consumed
FROM TABLE(
TUMBLE(TABLE `aws-conf-drc`.`flinksql-test`.`sensor_readings`, DESCRIPTOR($rowtime), INTERVAL '30' SECONDS))
GROUP BY device_id
Look closely how the $rowtime
descriptor is used, to denote that the window operations use the PROCESSING TIME
time attribute and not the EVENT TIME
(though, for this experiment, it is easy to use event time as well, using the device_ts
field, I leave that to you to experiment!). This Flink SQL statement runs perpetually as a continuous query, calculating the windowed sum and populating the device_charge
table. Click +
symbol on the workspace which opens up another cell for writing this statement:
The statement status changes from pending
to running
. If there are no errors, time to test messages in the topics :
Perfect! device_id
and the SUM of ampere_hour
is getting inserted into the new table. Remember, we could have created the device_charge
table with a PRIMARY KEY(device_id) NOT ENFORCED
statement as well, in that case, the key of the messages would have been the device_id
. For this experiment, I intend to ship all messages to the dashboard and apply COUNT
there, so, I omitted that step.
Clicking the statement name link shows up key metrics on the right hand side like, messages in, messages out, scaling status, etc.
So far, so good! Let’s build a managed Kafka Connector for Elasticsearch Sink in the next step!
Step-7: Managed Elasticsearch Sink Connector on Confluent Cloud
Click on the cluster name breadcrumb and the Connectors link on the left hand nav, this opens up the Connectors Plugin page :
Search for “Elasticsearch” and click on the ElasticsearchSink connector, this opens up the page to select topics; select, device_charge
:
In the next page, click Use an existing API key
and paste the cluster API key and secret, created in Step-2 :
In the next page, paste the ES endpoint created in Step-4 in the Connection URI
textbox. Use elastic
as the Connection User and the regenerated pasword in Step-4 in the password field. Leave, SSL security as PLAINTEXT.
In the next page, select AVRO as the input Kafka record value format, and select KEY Ignore
as true
, since, the Flink SQL aggregation omits the PRIMARY key :
Select Task=1 (default) in the next page and click continue.
In the final configuration page, rename the connector to add device_charge in the name and have a look at the connector configuration JSON :
{
"topics": "device_charge",
"schema.context.name": "default",
"input.data.format": "AVRO",
"connector.class": "ElasticsearchSink",
"name": "ElasticsearchSinkConnector_device_charge",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<KAFKA_CLUSTER_API_KEY>",
"kafka.api.secret": "<KAFKA_CLUSTER_API_SECRET>",
"connection.url": "https://<ES_DEPLOY_ID>.us-east-2.aws.elastic-cloud.com",
"connection.username": "elastic",
"connection.password": "<ELASTIC_USER_PSWD>",
"elastic.security.protocol": "PLAINTEXT",
"key.ignore": "true",
"schema.ignore": "false",
"compact.map.entries": "true",
"write.method": "INSERT",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "fail",
"drop.invalid.message": "false",
"log.sensitive.data": "false",
"batch.size": "2000",
"linger.ms": "1000",
"flush.timeout.ms": "10000",
"flush.synchronously": "true",
"connection.compression": "false",
"read.timeout.ms": "15000",
"max.poll.interval.ms": "300000",
"max.poll.records": "500",
"tasks.max": "1",
"data.stream.type": "none"
}
This is a fully managed connector, so all configurations were generated and the task will run in a managed environment ! Click continue and you should see the Connector status moving from Provisioning
to Running
:
That’s it ! Messages with aggregated charge_consumed
is now shipping from the Kafka topic to Elasticsearch with the ElasticsearchSink connector running successfully !
Step-8: investigate the Elasticsearch Index for messages from Kafka
Login to Elastic cloud here :
Click on the deployment (notice, the region is the same us-east-2), then click on Search > Elasticsearch from the left nav menu, followed by clicking Indices
on the Easticsearch page :
Fantastic ! ! The device_charge
topic has been successfully shipped to the Elasticsearch index with the same name ! Click on the index and then on Documents
:
Readings for each device_id
is showing up just right ! Now, the final step of creating a real time dashboard.
Step-9: Creating a Real time Kibana Dashboard
On the Elasticsearch page, click on Dashboards on the let nav menu, followed by clicking +Create Dashboard
:
Click Create visualization
and in the top left corner, filter the data view by using device*
as the search string, and click Explore 1 Matching index
:
This will bring charge_consumed and device_id as fields.
Let’s select a Bar vertical stacked
chart.
From the right hand side menu, select device_id
as the horizontal axis and Sum
as the aggregate function and charge_consumed
as the field in the dropdown :
Clicking refresh will refresh values.
One last thing would be to enable AUTO REFRESH
on the Kibana dashboard.
Click Save and Return
, then on the top right calendar dropdown, enable Refresh every
switch with a 30 seconds refresh interval. Click Apply, then Save button and provide a name for the dashboard. Now, the dashboard will refresh every 30 seconds.
Step-10: Terminate all resources
Do not forget to delete resources deployed on Confluent Cloud and Elastic Cloud, to stop billing once the experiment is over. Here are the steps to delete :
- Stop the Kafka Python producer.
- Delete the dashboard from Elastic Cloud
- Delete the Index from Elasticsearch
- Go to
Settings
on the Confluent managed Connector card and clickDelete Connector
. - Click each topic, go to the
Configuration
link and delete topic. - Go to the Kafka cluster overview page, click on
Cluster Settings
on the left nav menu and clickDelete Cluster
. - Once, confirmed that all steps above are successfully complete, delete the environment, by clicking
Delete Environment
on the bottom right nav menu.
Conclusion :
Phew ! ! That was a long series of steps ! But, a thoroughly enjoyable one too ! This is a demonstration of a fully automated data streaming pipeline with Kafka, Flink, a managed Kafka Connector for Elasticsearch and ElasticCloud Elasticsearch and Kibana.
Managed data streaming platform adds considerable reliability, resilience and availability to any enterprise grade Modern Data Platform
by automating and abstracting many infrastructure moving parts. A complete data streaming platform needs to provide accurate results most of the times if not always, and with this experiment I demonstrated most of the moving parts abstracted out with Kafka, Flink, Managed Connector and Elastic Cloud Elasticsearch and Kibana real time dashboard.
The Kafka AVRO python producer and the AVRO schema file is in the Github repo.
Stay tuned ! Happy coding !