Analysis of streaming data with Kafka.
Introduction
What if I don’t want to, or can’t wait for data to appear in a datawarehouse before I can analyse it? There obvious use cases like IoT, but every organisation can benefit from understanding at least some of their processes earlier and react to events sooner. At Pyramid we want to enable businesses to access those events as they occur and make it easy to plug them into dataflows, transformations and analyses with a simple interface that drives reuse, governance and security of data and business logic.
I’m going to take some data from New York’s Citi Bike scheme, push it into a Kafka service I quickly configure, and use Pyramid to create and use shareable data source configuration and business logic to gain insight into that data seconds after it is generated.
Kafka is a distributed, stream-processing software platform that supports high levels of fault-tolerance and scalability. It is used to build real-time data pipelines, where data needs to be reliably shared between systems; as well as streaming applications, that react to or transform streams of data. Kafka is open-source and distributed under the Apache License. You can read more about its capabilities here.
Kafka uses concepts of publishers, subscribers and topics. A topic is analogous to a database table. Publishers write messages to topics. Subscribers read messages from topics. These messages are persisted, and typically replicated across a cluster of nodes. Messages have a configurable persistence duration; there is no concept of holding messages until all subscribers have read them. Topics can be partitioned to support parallel consumption for greater throughput. As far back as 2015 Kafka was being used in some installations to support a trillion messages a day.
Although we don’t currently have a pre-built connector for Kafka in Pyramid it’s easy to use the Python data source block in the Model component of Pyramid to build one.
Kafka Configuration and Confluent Cloud
If you are already familiar with Kafka and know how to build a cluster and create topics you can jump ahead to the next section, Creating a Kafka Connector for Pyramid.
Before I can build a connector, I need somewhere to connect to and test what I’m building. You can install and configure your own Kafka cluster on anything from a single server or VM, but for speed of set up and on-going management I decided to use Confluent Cloud. Confluent is the company set up by the lead developers of Kafka who built the original software when they were working at LinkedIn and they’ve continued to build, support and open-source features around Kafka to create a broader platform for streams processing.
It took me less than 5 minutes to have a Kafka cluster set up so I could get on with building and testing a connection. Having created a login and handed over some credit card details I create a new cluster and a topic to hold my messages.
From the splash screen, select Add cluster.
Give the cluster a name. Select a cloud provider; Amazon or Google. And select a region. Confluent charge by data ingress/egress. And for storage. For a few GB to test with the cost is pennies.
Having confirmed which credit card to charge you are taken to an environment overview. There’s only the one cluster here for me, and clicking on its name takes me into more details.
From the details screen I selected Topics on the left-hand menu and then the Create topic button on the right-hand side.
I need to give the topic a name. Then chose Number of partitions. I’ve selected 1 partition here for simplicity. In a production system you would typically have multiple partitions to support multiple consumers; if one consumer fails the remaining consumers can be allocated the messages the failed consumer was reading. You can see on the right-hand side that even with a single partition the data is still replicated 3 times on the cluster for resilience. Finally, I click the Create with defaults button.
Having created the topic, the last thing we’ll need to do is set up a key / secret for the cluster which will be used to establish TLS/SSL connections with SASL/PLAIN from Pyramid to Confluent Cloud. If we built Kafka within our own network rather than consuming this cloud service, we wouldn’t necessarily need to do this.
From the left-hand menu select Overview. From the Cluster overview menu select API ACCESS. And then click the Add key button.
This generates a new key / secret pair that we need to copy and paste somewhere safe before continuing. We’ll use this information when we configure the connection from Pyramid back to Kafka.
You also need to make a note of the bootstrap server address. This is where Pyramid will establish a connection to Kafka. You can find this under the Data In/Out option from the left-hand menu, and below that Clients. The sample configuration code has the bootstrap.servers address highlighted.
And that’s it. In a couple of minutes I have a Kafka cluster, a topic ready to have messages written to and read from, and an API key and connection details. If you want to test everything is working at this stage the Confluent Cloud CLI has some simple commands to produce and consume messages.
Creating a Kafka connector for Pyramid
Confluent Cloud supports a variety of client libraries to allow you to produce and consume messages, including one for Python. The confluent-kafka-python library is a wrapper for the librdkafka C client. This worked great on my Ubuntu server, but I couldn’t get the SASL component to work with a Windows server. Lots of Googling showed a few people suggesting they may have got it to work, but not how. And lots of people, who like me, struggled. For now I can only suggest this approach where your Pyramid server is built on Linux.
We need to create a Kafka consumer, a process that will connect to a topic on a Kafka cluster and read records. There are a lot of options for how you can do this and they will depend on amongst other things requirements for resilience, latency, chance of a record being received more than once, potential for missed records; if your IoT sensor is producing temperature readings 100 times a second will your analysis be poorer if you miss one? Depends on if your sensor is in a toaster or a jet engine. The example here is intentionally simple. We have a topic with a single partition, so no reading messages in parallel. The messages are comma separated strings. We are interested in the last 1000 messages to have been produced. Our consumer will need to output a Pandas dataframe. Pandas is a Python package commonly used for data manipulation and it’s dataframe is a tabular structure understood by Pyramid.
In the Model component of Pyramid I can chose Python as a data source. I drag the icon from Sources onto my Data Flow canvas and can change the properties on the right-hand side to include Python packages and scripts that use those packages.
In the properties I can select Quick Script which allows me to type in or cut and paste a script into the text box and mention any packages I want to load to the Pyramid server in the PIP packages box.
As an alternative and to make it easier to find the script again or to share it with colleagues I can use the script option within the Formulate module.
This gives me more room to see what’s going on, although for developing Python scripts with more than very basic functionality I prefer to use a specific IDE and then cut and paste to Formulate.
I only need to create this logic once, and can then easily share it with other users. Once saved, anyone with the role needed to see the folder where I’ve saved it can get the script by selecting Select Script, rather than Quick Script when configuring the Python block in the Model module.
Having selected my script, either via the Quick Script or Select Script options I need to define the output fields. Pyramid expects the output of a Python block to be readable from a Pandas dataframe. Dataframes are explained more when we step through the code below. The important thing here is that we specify the name of the dataframe we want to read output from, and in this case that dataframe is named df. If we click the Auto Detect button, we are asked for the dataframe name, and Pyramid will generate field names based on the column names in the dataframe.
That’s all the Python block configuration done. At this stage you can start to query the data or perform further transformations of the data in a simple to define pipeline. Jump ahead to the next section, or carry on reading here to take a closer look at the script and see what it is doing. The full script is attached and named consume.pandas.last.n.msgs.py
We start by importing the packages we need to interact with Kafka. The only one of these that isn’t pre-loaded to the Pyramid server as part of the Python installation is confluent_kafka, and this is the only package we need to explicitly load through the PIP Packages option shown above.
import uuid from confluent_kafka import Consumer, TopicPartition from datetime import datetime import pandas as pd
We then define a couple of variables to hold the topic name we defined when we created the topic in Kafka and build an empty list variable for use at the end of the script when I want to turn my message which is a comma separated list into multiple columns in a dataframe.
Then we create an instance of Consumer, the Kafka API object that defines our connection to the Kafka cluster. Here we include the bootstrap server we noted when we created the Kafka cluster. And the API key and secret which are used as values for sasl.username and sasl.password. Everything else is a default but it’s nice to define it here to be clear how we are connecting.
topic='python-csv-topic' #this topic contains a single partition mylist=[] c = Consumer({ 'bootstrap.servers': 'pkc-l9pve.eu-west-1.aws.confluent.cloud:9092', 'broker.version.fallback': '0.10.0.0', 'api.version.fallback.ms': 0, 'sasl.mechanisms': 'PLAIN', 'security.protocol': 'SASL_SSL', 'sasl.username': 'KBUKVPZIZMADRO5L', 'sasl.password': 'rKhcg7hRVulPXdccVNZqSp1E6W2AwTR784g/ik22Ee8wNPLRJhmAxTKSUI3Vi/mh', 'group.id': str(uuid.uuid1()), # this will create a new consumer group each run 'auto.offset.reset': 'earliest' })
Next we want to define how many records to read. Kafka is a streaming platform and typically we are interested in very recent data to analyse for the most up to date insight. Here I am reading the most recent 1000 records. It would also be possible to read recent records defined by time. The API provides methods for doing just this. It does get a little more complicated to manage the process across multiple partitions which is why I only defined a single partition for my topic here.
We create an instance of a topic partition called tp. Then we find the highest watermark, essentially the sequential key used to identify records in a topic. And then read back 1000 records to define a place to start reading.
tp = TopicPartition(topic, 0) #this topic only has 1 partition hwm = c.get_watermark_offsets(tp)[1]-1 #returns low high so [1] is high tp = TopicPartition(tp.topic, tp.partition, hwm-1) #start reading 2 records below hwm
collist is a list of column names I want to use when I write my data to a data frame at the end of the script.
Then I subscribe to the topic; I tell Kafka I am interested in reading the data. And then I assign a partition and a location in that partition to start reading from as defined in the tp object I just created.
collist=["tripduration","starttime","stoptime","starttime_text","stoptime_text","start_station_id","start_station_name","start_station_latitude","start_station_longitude","end_station_id","end_station_name","end_station_latitude","end_station_longitude","bikeid","usertype","birth_year","gender"] c.subscribe([topic]) c.assign([tp])
Then we read the data. Remember Kafka is handling streaming data and data could be arriving all the time we are running this process. The poll method polls for the specified number of seconds, in this case 10, and returns all records discovered in that period as a list. If no records are returned or if we reach the high watermark where the last record was when we started polling we exit the loop. Here 10 seconds is ample time to read 1000 records and we only expect a single iteration of the loop.
try: while True: msg = c.poll(10) if msg is None: break if msg.error(): print("Consumer error: {}".format(msg.error())) #for debugging continue clean_msg=msg.value().decode("utf-8") mylist.append(clean_msg) if msg.offset() == hwm: break except KeyboardInterrupt: pass
Finally, we iterate through the list of records generated in the previous step, and split each record into Pandas dataframe columns. The column names we defined earlier in the script. Although Pandas does not mandate you define column names, we cannot read the data into Pyramid without these.
finally: df = pd.DataFrame([sub.split(",") for sub in mylist], columns=collist) c.close()
Analysis of data read from Kafka
The final step is to use our Python block to read some data from Kafka and perform some analysis. In the real-world we’ll be streaming messages into Kafka but to test I’ll write a small Python script to loop through a CSV file and write all the records to my Kafka topic. Then I’ll consume the last 1000 and analyse them.
The dataset I’ve used describes journeys using the New York Citi Bike Service, as is available here. The script I used to load a csv formatted version of the data to the topic I created is attached and named produce.list.py
In Pyramid I start by creating a model with a pipeline describing my desired data flow. Here the first Python block is our source and encapsulates the consumer script we’ve just written. For further analytics it’s useful to build a complete time hierarchy based on the event timestamps, so I can easily aggregate by year, quarter, week, etc, and the Time Intelligence blocks that come with Pyramid let me easily do that. There’s also a Marketplace of Python and R scripts as part of Pyramid which meant that here I could just drag in functionality to calculate the distance between any two geographic points in my data. I also added a block to derive clusters of journey data using the k-means algorithm, again just a drag and drop operation.
Once the data is loaded to Kafka we can consume it by running a dataflow in the Model module. I’ll just run this as a one-off to demonstrate but you can schedule data flows to run periodically. So for example you could run the flow every 10 minutes, or whatever interval is appropriate for your data, and analyses you create would then show the latest data streaming through Kafka every time it is viewed.
For this example I have create a visualisation that maps the starting positions of journeys, colour coded by journey distance and sized the points depending on journey duration. I get a fresh view of journeys as frequently as I set my pipeline to execute. With a couple of clicks I could drag this into a dashboard, add it to a publication for scheduled and targeted distribution, or embed it in a website.
Summary
Data architectures are evolving faster than we’ve ever seen and the plethora of toolsets and frameworks being developed to optimise specific facets of data persistence, analysis and distribution has bought flexibility and opportunities developers have taken for granted in recent years; but data consumers have sometimes struggled to keep up with. At the same time Python has emerged as one of the most popular general-purpose programming languages with huge community support that means it’s very rare for any new framework to evolve far without Python APIs. The Python block in Pyramid’s Model module means that we can read and analyse data wherever it is produced in an organisation giving the ability to bring deep and flexible analysis and visualisation on data as it is generated meaning decisions can be taken as events occur and are not bound to historic accounts.
Reply
Content aside
- 2 Likes
- 5 yrs agoLast active
- 421Views
- 1 Following