Build a Real-Time Kibana Dashboard With Field Sensor Data Using Confluent Cloud, Kafka, Flink SQL, and Elastic Cloud

Diptiman Raichaudhuri
17 min readMay 14, 2024

--

Disclosure: All opinions expressed in this article are my own, and represent no one but myself and not those of my current or any previous employers.

After finishing the pyflink-kafka getting started series, I wanted to create an end-to-end example, by taking the same business problem of field sensors emitting device reading events to Apache Kafka®, using Flink SQL to transform the records and create a new table which gets updated with the charge consumed by each device every 30 seconds (tumbling window), and then displays a real-time dashboard of the charge consumed.

For real-world production scenarios, Kafka and Apache Flink® needs to be deployed on managed platforms, otherwise it becomes difficult to manage infrastructure intricacies (e.g., fault tolerance, high availability, auto-scaling).

For this end-to-end example, I will run a sensor simulator for three devices, create topics on Confluent Cloud for storage of events and write Flink SQL on Confluent Cloud to transform device events and calculate the SUM of charge consumed every 30 minutes, which will be stored in a topic generated automatically by Confluent.

After that, I would run a managed ElasticSearch sink connector on Confluent Cloud and push these derived readings to an Elastic Cloud index and build a realtime Kibana dashboard which will auto-refresh every five seconds.

Here’s a broad diagram of the data flow:

Managed data streaming on Confluent Cloud and Elastic Cloud

Since the entire data pipeline will run on cloud, it needs to be ensured that the Kafka topic, the Flink compute pool and the Elastic Cloud resources (Index + Kibana dashboard) are all within the same region of the cloud provider (AWS/Microsoft Azure/Google Cloud).

Let’s break down the steps to complete this experiment:

  1. Create a Confluent Cloud account with the cloud provider of choice and the region.
  2. Create and configure a Confluent Cloud Environment and a Kafka Cluster.
  3. Create and configure a Flink compute pool for running a set of managed Flink SQL statements through the Flink SQL workspace.
  4. Create a Elastic Cloud deployment with the same cloud provider and in the same region.
  5. Create a standalone python AVRO sensor producer, which runs on a local PC and simulates devices emitting events to the Kafka topic.
  6. Run Flink SQL statements for transforming the data.
  7. Create and configure an ElasticSearch sink connector on Confluent Cloud to connect to the Elastic Cloud
  8. Verify that the Elastic Cloud index is receiving messages.
  9. Create the Kibana real time dashboard.
  10. Terminate all resources.

Let’s get going.

Step 1: Create a Confluent Cloud account

While it is quick and easy to setup a Confluent Cloud account (you need to signup with an email, and you get $400 of free usage for the first 30 days), I recommend you go through steps mentioned in the Confluent Developer Portal. A Basic cluster is good enough for this experiment. Also, I chose AWS, us-east-2(Ohio) as the region where Confluent cloud would provision its resources.

Confluent Cloud New Account Creation

Step 2: Create and configure a Kafka cluster and environment

Once, the Confluent account is all setup (you have to verify emails etc ..), it starts with a default Environment . An environment logically isolates set of resources for a workload. For this experiment, let us create a new environment, which I named aws-conf-drc . The Essentials stream governance package will be good enough to run this experiment.

Confluent Cloud Environment

Once, your environment gets created, check the bottom of the page on the right hand side and you would see that a Stream Governance API section will be created. Make a note of the Schema Registry endpoint, the Schema Registry ID and create a Schema Registry API Key. The schema registry key and the secret should be downloaded and keep the file in a secure place.

Each environment comes with a Stream Governance API (which comprises of a Schema Registry and Stream Catalog API. Schema registry is important since, the device readings would conform to a specific AVRO schema that I’ll create a little later in the section. Confluent cloud schema registry is a managed stream catalog resource which will be used for serializing and de serializing AVRO messages.

Confluent Cloud Schema Registry

With that, the Confluent Cloud environment creation is complete.

Let’s create a Kafka cluster on Confluent Cloud by clicking on the +Add Cluster button. Select Basic Configuration , which would be good enough for this experiment and select AWS and us-east-2(Ohio) Single Region as the cloud deployment region.

Once, the cluster is launched, note down the cluster id and the bootstrap server endpoint from the cluster settings page.

Kafka Cluster Bootstrap Server Endpoint

Next, create a cluster API key from the left hand navigation menu. Click on API Keys on the left-nav menu and then click +Add Key . Download the cluster API key file and note down the key and the secret.

let’s create a topic sensor_readings on the brand new cluster, with all default settings, by clicking on the Topics link on the left navigation menu.

Step 3: Create and configure a Flink compute pool

With the Kafka cluster created and configured in less than 2 minutes(no pun intended ! !), let’s setup the managed Flink SQL environment to run the streaming transformation (or, streaming ETL).

Click the Environments breadcrumb link on top and select the environment which was created. Click Flink new and click +Add Compute Pool .

Create Flink Compute Pool

Ensure that the cloud region remains the same (AWS us-east-2) for the Flink compute pool.

Flink Compute pool in the same cloud region

In the next page, leave everything for default, and the compute pool will get created with a MAX 10 CFU(Confluent Cloud for Apache Flink Unit), which is the maximum that the pool can auto scale to. Fully managed Flink it is ! !

With that the Flink compute pool is ready to run Flink SQL statements.

Step 4: Create a ElasticCloud deployment in the same region

This experiment intends to transport transformed messages from a Kafka topic to an Elasticsearch index using managed Elasticsearch sink connector on Confluent Cloud.

So, let’s create a Elastic Cloud deployment in the same region (AWS, us-east-2).

Click the Free Tier Elastic Cloud link. This provides a 14 days, no credit card free trial, which is enough to test the pipeline, if it works fine.

Registration is quick, and ensure you select AWS,us-east-2(Ohio) as the region for deployment. I named my deployment drcdeploy1 . Paste the deployment URL on your browser and verify that the deployment is showing up alright, with the right region.

ElasticCloud Deployment

Click on security link on the left nav menu and generate a new password for the elastic user. Also, make a note of the Elasticsearch endpoint.

That’s it, deployment for the real time dashboard is created. Would revisit this deployment once the pipeline starts shipping messages from Kafka topic to Elastic Cloud.

Step 5: Create a python AVRO Kafka producer

Let’s now get down to the business of wiring up the sensor simulator to create device events. For this article, Pycharm Community Edition is suesd to write the python producer. All code is in the Github repo.

Since, this is a PROD like scenario, let us setup the AVRO schema first for the device. Let’s name this file reading.avsc and save it in a folder avro . Later in the article, we’ll check within Schema Registry, how this schema is used. All code files are in the github repo.

{
"name": "Reading",
"type": "record",
"fields": [
{
"name": "device_id",
"type": "string"
},
{
"name": "co",
"type": "long"
},
{
"name": "humidity",
"type": "long"
},
{
"name": "motion",
"type": "boolean"
},
{
"name": "temp",
"type": "long"
},
{
"name": "ampere_hour",
"type": "long"
},
{
"name": "device_ts",
"type": "long"
}
]
}

Let’s also create a configuration file cc_config.py . We will now use the Kafka Cluster and schema Registry credentials, that was downloaded earlier.

cc_config = {
'bootstrap.servers': '<BOOTSTRAP_SERVER_ENDPOINT>.us-east-2.aws.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '<CLUSTER_API_KEY>',
'sasl.password': '<CLUSTER_API_SECRET>'
}

sr_config = {
'url': 'https://<SCHEMA_REGISTRY_ENDPOINT>.us-east-2.aws.confluent.cloud',
'basic.auth.user.info': '<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>'
}

Be careful about the value for the Schema Registry key basic.auth.user.info , it is of the format '<API_KEY>:<API_SECRET>' . Also, notice the Kafka security.protocol and the sasl.mechanisms properties. Bootstrap server endpoints are by default secure for data-in-transit. The sasl.username and the sasl.password values are the cluster API Key and API Secret, respectively.

Now, that our connection configuration is done, let’s break down the sensor simulator producer code. I have taken the template from Confluent Python repo.

In Pycharm CE, let’s create a python file cc_sensor_producer_avro.py :

import random
import time
import cc_config

from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

Important imports are the cc_config (for connection configuration parameters), SchemaRegsitryClient and AvrSerializer . While, I intend to dive deep in a later article on Schema Registry, for this experiment it is enough to know that the Schema Registry would guarantee the type contract between messages produced by the producer and messages consumed by the consumer. For this experiment, a message is sent with a key of type string and a value of type AVRO record to Kafka.

Next, let’s create a placeholder class for the sensor device readings, in the same file:

class Reading(object):
"""
Sensor Reading Record

Args:
device_id (str): Sensor unique device_id

co (long): Carbon Monoxide volume

humidity (long): Humidity percentage

motion(boolean): Whether the device has moved from its original position

temp(long): Temperature of the device

ampere_hour(long): charge consumed by the device

device_ts(long): event timestamp of the device when an event is emitted
"""

def __init__(self, device_id, co, humidity, motion, temp, ampere_hour, device_ts):
self.device_id = device_id
self.co = co
self.humidity = humidity
self.motion = motion
self.temp = temp
self.ampere_hour = ampere_hour
self.device_ts = device_ts

Let’s create a device reading to dict utility method, which will serialize an object of Reading class to a python dict so that it is in a wire transferable format :

def reading_to_dict(reading, ctx):
"""
Returns a dict representation of a Reading instance for serialization.

Args:
reading (Reading): Reading instance.

ctx (SerializationContext): Metadata pertaining to the serialization
operation.

Returns:
dict: Dict populated with sensor device reading attributes to be serialized.
"""

# User._address must not be serialized; omit from dict
return dict(device_id=reading.device_id,
co=reading.co,
humidity=reading.humidity,
motion=reading.motion,
temp=reading.temp,
ampere_hour=reading.ampere_hour,
device_ts=reading.device_ts)

Let’s also create a callback method for the producer :

def delivery_report(err, msg):
"""
Reports the failure or success of a message delivery.

Args:
err (KafkaError): The error that occurred on None on success.

msg (Message): The message that was produced or failed.

Note:
In the delivery report callback the Message.key() and Message.value()
will be the binary format as encoded by any configured Serializers and
not the same object that was passed to produce().
If you wish to pass the original object(s) for key and value to delivery
report callback we recommend a bound callback or lambda where you pass
the objects along.
in this case, msg.key() will return the sensor device id, since, that is set
as the key in the message.
"""

if err is not None:
print("Delivery failed for Device event {}: {}".format(msg.key(), err))
return
print('Device event {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))

Let’s create the main() method :

def main():
topic = 'sensor_readings'
schema = 'reading.avsc'

with open(f"avro/{schema}") as f:
schema_str = f.read()

schema_registry_conf = cc_config.sr_config
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

avro_serializer = AvroSerializer(schema_registry_client,
schema_str,
reading_to_dict)

producer_conf = cc_config.cc_config
producer = Producer(producer_conf)

print("Producing device events to topic {}. ^C to exit.".format(topic))
while True:

devices = ['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']
device_id = random.choice(devices)
co = round(random.uniform(0.0011, 0.0072), 4)
humidity = round(random.uniform(45.00, 78.00), 2)
motion = random.choice([True, False])
temp = round(random.uniform(17.00, 36.00), 2)
ampere_hour = round(random.uniform(0.10, 1.80), 2)
device_ts = int(time.time() * 1000)

try:
reading = Reading(device_id=device_id,
co=co,
humidity=humidity,
motion=motion,
temp=temp,
ampere_hour=ampere_hour,
device_ts=device_ts,)
producer.produce(topic=topic,
key=device_id,
value=avro_serializer(reading, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report)
time.sleep(5)
except KeyboardInterrupt:
break
except ValueError:
print("Invalid input, discarding record...")
continue
finally:
producer.flush()

Let’s carefully understand what’s happening, line by line.

We need to create a sensor_readings topic in the Confluent Cloud. The AVRO schema is assigned the variable schema, which reads the reading.avsc AVRO schema specification from avro folder and reads the schema content and stores it in schema_str variable.

Then the program uses the sr_config dictionary from the cc_config.py module(which is already imported) and creates a schema_registry_client .

This schema registry client reference, the schema_str variable and the dictionary of the reading object is passed to the AvroSerializer constructor to create an avro_serializer reference.

Similarly, the producer configuration is taken from the cc_config module and a producer reference is created.

Then, within an infinite while loop, I create a device Reading object and pass it to the avro_serializer . Note, that I had only created an AVRO serializer for the message value, for message key I continue to use the device_id which is a string. In the producer.produce() signature I also pass the topic variable and the delivery_report callback reference.

That’s it, the sensor aggregator listening to 3 sensor devices and creating messages for each sensor at a 5 seconds interval is ready to run !

Step 6: Run Flink SQL on managed Flink Compute Pool

If you run the producer developed in the last step, it would start producing device messages to the Kafka topic sensor_readings . The python producer would print the delivery report in Pycharm CE console:

/opt/homebrew/opt/python@3.10/libexec/bin/python /Users/diptimanraichaudhuri/testing_space/pycharm_workspace/confluent-kafka-flink/cc_sensor_producer_avro.py 
Producing device events to topic sensor_readings. ^C to exit.
Device event b'b8:27:eb:bf:9d:51' successfully produced to sensor_readings [4] at offset 0
Device event b'b8:27:eb:bf:9d:51' successfully produced to sensor_readings [4] at offset 1
Device event b'b8:27:eb:bf:9d:51' successfully produced to sensor_readings [4] at offset 2
Device event b'00:0f:00:70:91:0a' successfully produced to sensor_readings [3] at offset 0

Login to the Confluent Cloud console, click the cluster link, followed by the topic, and you should see this :

sensor_readings topic

Notice how the key is retained as the device_id . Now, click on the Schema link and notice how the value has the schema that was developed :

sensor_readings schema

Now, let’s click on the environment link on the breadcrumb and open the Flink compute pool console. Once opened click on Open SQL Workspace :

Flink Compute Pool

The Flink SQL Workspace opens with a Flink SQL editor and Flink catalogs in the left nav menu. You should be able to see the topic that was created ( sensor_readings ) appearing on the left nav menu.

Flink conveniently takes the environment as the root catalog, any clusters created as a database within this root catalog and topics created as tables where Flink SQL queries could run. This essentially reduces the whole Flink stream processing operation as Flink SQL statements, which can run in a managed way, fully automated infrastructure (auto-scaling to 10 CFUs, as explained earlier etc ..).

Flink SQL Workspace

For this, experiment, I will create a Flink dynamic table within this SQL workspace named device_charge where I would continually select the SUM aggregation of ampere_hour every 30 seconds using a TUMBLING WINDOW Windowing Table-Valued Functions (Windowing TVFs), and GROUP BY device_id . Let’s write the Flink SQL statement :

create table `aws-conf-drc`.`flinksql-test`.`device_charge`(
device_id STRING,
charge_consumed DOUBLE
) WITH (
'changelog.mode' = 'retract'
);

Notice, the important 'changelog.mode' = 'retract' property, which essentially makes the table ready for update, instead of being an APPEND ONLY table. In case you want to go deeper, read the Confluent docs, to understand more about Flink Dynamic Tables and the difference between append only and updating tables. Interestingly, The CREATE TABLE statement always creates a backing Kafka topic as well as the corresponding schema subjects for key and value, and, trying to create a table with a name that exists in the catalog causes an exception.

Flink SQL CREATE TABLE

Also check that the Cluster topics page should now show two topics, the new one being device_charge.

Flink dynamic table backed by Kafka topic

Flink created a backing topic for the new table device_charge with a generated AVRO schema for the value fields.

Now, let’s run the meat of the query (!!), which is writing the Flink SQL aggregation and inserting the results into this new device_charge table:

INSERT INTO `aws-conf-drc`.`flinksql-test`.`device_charge`
SELECT device_id,SUM(ampere_hour) AS charge_consumed
FROM TABLE(
TUMBLE(TABLE `aws-conf-drc`.`flinksql-test`.`sensor_readings`, DESCRIPTOR($rowtime), INTERVAL '30' SECONDS))
GROUP BY device_id

Look closely how the $rowtime descriptor is used, to denote that the window operations use the PROCESSING TIME time attribute and not the EVENT TIME (though, for this experiment, it is easy to use event time as well, using the device_ts field, I leave that to you to experiment!). This Flink SQL statement runs perpetually as a continuous query, calculating the windowed sum and populating the device_charge table. Click + symbol on the workspace which opens up another cell for writing this statement:

Flink Continuous Query

The statement status changes from pending to running . If there are no errors, time to test messages in the topics :

Flink SQL output Kafka topic

Perfect! device_id and the SUM of ampere_hour is getting inserted into the new table. Remember, we could have created the device_charge table with a PRIMARY KEY(device_id) NOT ENFORCED statement as well, in that case, the key of the messages would have been the device_id. For this experiment, I intend to ship all messages to the dashboard and apply COUNT there, so, I omitted that step.

Clicking the statement name link shows up key metrics on the right hand side like, messages in, messages out, scaling status, etc.

So far, so good! Let’s build a managed Kafka Connector for Elasticsearch Sink in the next step!

Step-7: Managed Elasticsearch Sink Connector on Confluent Cloud

Click on the cluster name breadcrumb and the Connectors link on the left hand nav, this opens up the Connectors Plugin page :

Confluent Managed Connectors

Search for “Elasticsearch” and click on the ElasticsearchSink connector, this opens up the page to select topics; select, device_charge :

Managed Elasticsearch Sink Connector on Confluent Cloud

In the next page, click Use an existing API key and paste the cluster API key and secret, created in Step-2 :

Connector Cluster API Key Secret

In the next page, paste the ES endpoint created in Step-4 in the Connection URI textbox. Use elastic as the Connection User and the regenerated pasword in Step-4 in the password field. Leave, SSL security as PLAINTEXT.

Connector Elasticsearch Cloud Credentials

In the next page, select AVRO as the input Kafka record value format, and select KEY Ignore as true , since, the Flink SQL aggregation omits the PRIMARY key :

Elasticsearch Connector Schema

Select Task=1 (default) in the next page and click continue.

In the final configuration page, rename the connector to add device_charge in the name and have a look at the connector configuration JSON :

{
"topics": "device_charge",
"schema.context.name": "default",
"input.data.format": "AVRO",
"connector.class": "ElasticsearchSink",
"name": "ElasticsearchSinkConnector_device_charge",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<KAFKA_CLUSTER_API_KEY>",
"kafka.api.secret": "<KAFKA_CLUSTER_API_SECRET>",
"connection.url": "https://<ES_DEPLOY_ID>.us-east-2.aws.elastic-cloud.com",
"connection.username": "elastic",
"connection.password": "<ELASTIC_USER_PSWD>",
"elastic.security.protocol": "PLAINTEXT",
"key.ignore": "true",
"schema.ignore": "false",
"compact.map.entries": "true",
"write.method": "INSERT",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "fail",
"drop.invalid.message": "false",
"log.sensitive.data": "false",
"batch.size": "2000",
"linger.ms": "1000",
"flush.timeout.ms": "10000",
"flush.synchronously": "true",
"connection.compression": "false",
"read.timeout.ms": "15000",
"max.poll.interval.ms": "300000",
"max.poll.records": "500",
"tasks.max": "1",
"data.stream.type": "none"
}

This is a fully managed connector, so all configurations were generated and the task will run in a managed environment ! Click continue and you should see the Connector status moving from Provisioning to Running :

Confluent Connector Status

That’s it ! Messages with aggregated charge_consumed is now shipping from the Kafka topic to Elasticsearch with the ElasticsearchSink connector running successfully !

Step-8: investigate the Elasticsearch Index for messages from Kafka

Login to Elastic cloud here :

Elastic Cloud Deployment

Click on the deployment (notice, the region is the same us-east-2), then click on Search > Elasticsearch from the left nav menu, followed by clicking Indices on the Easticsearch page :

Elasticsearch Kafka Topic Index

Fantastic ! ! The device_charge topic has been successfully shipped to the Elasticsearch index with the same name ! Click on the index and then on Documents :

Elasticsearch Index documents

Readings for each device_id is showing up just right ! Now, the final step of creating a real time dashboard.

Step-9: Creating a Real time Kibana Dashboard

On the Elasticsearch page, click on Dashboards on the let nav menu, followed by clicking +Create Dashboard :

Kibana Visualization

Click Create visualization and in the top left corner, filter the data view by using device* as the search string, and click Explore 1 Matching index :

Kibana Data view for Elasticsearch device_charge index

This will bring charge_consumed and device_id as fields.

Let’s select a Bar vertical stacked chart.

From the right hand side menu, select device_id as the horizontal axis and Sum as the aggregate function and charge_consumed as the field in the dropdown :

Kibana Device Charge Real Time Dashaboard

Clicking refresh will refresh values.

One last thing would be to enable AUTO REFRESH on the Kibana dashboard.

Click Save and Return , then on the top right calendar dropdown, enable Refresh every switch with a 30 seconds refresh interval. Click Apply, then Save button and provide a name for the dashboard. Now, the dashboard will refresh every 30 seconds.

Step-10: Terminate all resources

Do not forget to delete resources deployed on Confluent Cloud and Elastic Cloud, to stop billing once the experiment is over. Here are the steps to delete :

  1. Stop the Kafka Python producer.
  2. Delete the dashboard from Elastic Cloud
  3. Delete the Index from Elasticsearch
  4. Go to Settings on the Confluent managed Connector card and click Delete Connector .
  5. Click each topic, go to the Configuration link and delete topic.
  6. Go to the Kafka cluster overview page, click on Cluster Settings on the left nav menu and click Delete Cluster .
  7. Once, confirmed that all steps above are successfully complete, delete the environment, by clicking Delete Environment on the bottom right nav menu.

Conclusion :

Phew ! ! That was a long series of steps ! But, a thoroughly enjoyable one too ! This is a demonstration of a fully automated data streaming pipeline with Kafka, Flink, a managed Kafka Connector for Elasticsearch and ElasticCloud Elasticsearch and Kibana.

Managed data streaming platform adds considerable reliability, resilience and availability to any enterprise grade Modern Data Platform by automating and abstracting many infrastructure moving parts. A complete data streaming platform needs to provide accurate results most of the times if not always, and with this experiment I demonstrated most of the moving parts abstracted out with Kafka, Flink, Managed Connector and Elastic Cloud Elasticsearch and Kibana real time dashboard.

The Kafka AVRO python producer and the AVRO schema file is in the Github repo.

Stay tuned ! Happy coding !

--

--