Kafka Example Demo: How to Achieve 1M+ Record/Second Kafka Ingest without Sacrificing Query Latency

In this Kafka example demo, watch as Garrett Raska, Solutions Engineering Lead at Molecula, shows you how to achieve high throughput and low latency without sacrificing data freshness using FeatureBase. 

 

Kafka Example Demo: Video Transcript:

Garrett: “So let’s jump in and start with what this data that we’re going to be working with look like? So, here in the bottom left hand corner, you’ll see we have a simple JSON message file that we generated, and what it has is two key value pairs. We have an identifier which is alphanumeric. This is very common; we see this in the form of UU IDs; we see this in patient IDs, some kind of unique key that you’re using to give meaning to here’s the uniqueness of this particular record. Underneath this, you’re capturing groups and ID arrays, which are just interays and basically what this does is say alright, this identifier belongs to this group and allows us to check very rapidly. You see this type of data pattern in advertising, you see this in healthcare when you’re adding cohorts of patients to specific diagnoses. So you see this very general data shape, quite often throughout many different data sets.

So for today, we’ve generated 40 million of these and these are all unique, individual messages, and each have an array, the longest array is out to around 1000 particular groups, and this has decay in a logarithm faction. So if you were to look at all 40 million records across the spectrum, you’d see some that have mini groups attached to them, and then all the way down to a longtail. And again, this is a very common data shape we see in this space for pages, very common to have many records that have a lot of attributes and then all the way down to a few.

So it actually gives you an idea of the shape of data we’ll be ingesting into a table.

While we’re doing this, the unique part we’re going to be looking at is maintaining query latency while ingesting at a very high rate. So in the background, we are running this entire setup on AWS and a cluster of three nodes and everything is running on those nodes so Kafka is running on the same nodes ingesting to using the same compute. And we also have J meter and Grafana on this as well. We will look at some of these dashboards.”

Erica: Sorry, you mentioned everything’s running on the same cluster, but in a production environment, that wouldn’t necessarily be how this is set up. Can you maybe explain that again?

Molecula FeatureBase demo using Apache Kafka

Garrett:Absolutely. So just for the purpose of this exercise, it makes sense for us to try and pack as much as we could onto a very minimal infrastructure due to our CFO’s heart.

But mainly in production, you would advise you to split these out. You would, in production environments, you typically have your own Kafka cluster, your own broker setup there to handle the records and orchestrate its fits. Then you would have an ingest cluster and maybe some usually around one machine for that for most of ours, and then your core query cluster for FeatureBase that’s replicated and all that good stuff. So production, you would want to split your infrastructure out for many, many reasons other than the obvious..

All right, so let’s talk about so we have running on the same machine we have J meter running, J meter is simulating 25 concurrent users all at the same time sending requests to the cluster, right. So we’re simulating as if people are logging in and running an assortment of queries, the four main ones are listed here. So we will be running a SELECT COUNT DISTINCT from all the groups. The maximum cardinality of the groups is typically around in this data set is 26,000 unique groups. So as the records are flowing in, we’ll be querying more and more as time goes on. And getting the DISTINCT COUNT across that. We see this query very often in almost every use case because we’re looking for distinct elements about given record sets. You’re looking for distinct counts for various reasons.

 The next two or what’s called a little bit more just general complex queries, which are introducing AND and OR statements. So you want to find everything that matches this and that or this and then we have those flips, just to show a little bit of let’s call it to read there, and then last one is a top k, its sort of a special query that we run here, FeatureBase, and it really leverages our technology to the maximum and can produce very rapid and accurate counts across billions of records. So we do that in there. You’ll notice it’s the only one it’s not in traditional SQL that is in our DSL PQL.Kafka example use case and demo screentshot on aGrafana dashboard

So what we’ll see is behind the scenes, we have this Grafana dashboard, tracking each of the in just tracking each of those queries from the endpoints we make available. And we’ll be looking at the MAX and AVG of these queries of all of them of each of the same points and then we threw in our web UI that’s available out of box with our system as well.

So it gives you an idea won’t be looking at the data shape. Now let’s move over to setting up Kafka. So what I’ve run here is just a quick look at the topic itself. What we’ve done is we set up Kafka with a single broker, nothing complicated. And we have created one topic called segment 64 and we split that topic into 64 partitions. Now, as you can see here, running that scribe console command give you here, all the partitions so you can see all the way down to 63. And this is what we’ll be populating. Now on the other side of the house, you have our Molecula idk out of box comes with a static consumer and we’re going to run that and concurrency 64, that size of 62,000.

So what that is going to do is we’re going to grab the file created, use our producer to push that file in. And just one more matter of its own. If you’ll notice at the very beginning of these records, there’s a little number here, and that is a pre-appended key. Now, just a reminder, what we’re doing here is we know based on this identifier where this record will live within FeatureBase. So we’re grabbing that hash, moving it up into a ETL process which you can commonly do in many tools and essentially prepon this record now, when this producer is pushing this, this file into our Kafka topic is using that comma as a key separator, right. So essentially, we’re telling the producer, you need to grab this record and push it to this specific partition. And then on the other side, we’ll have our consumer looking at a specific partition grabbing it and introducing that into the FeatureBase store. So that’s the start to finish of how we can key it, show you where it’s gonna go and push it in very rapidly.

So what I’m doing here is I’m grabbing that JSON file 40 million records, shapes on the bottom left here, and I am populating the topic and all the partitions so it’s gonna take around 35 to 40 seconds to complete. While that’s finishes up,  on the other side, we’re going to run the command to start ingesting into FeatureBase here in just a moment.

We are going to create a segments table. We’re going to run 64 of these concurrently. So when I kick this off, it will spawn 64 child routines and that’s completed so that means that our topic is fully populated and we can pull as fast as we can.”

Erica: You said that was 40 million records? 

Garrett:Well, that’s loaded up into topic ready to be. So we basically don’t want to limit our ability to pull based on how fast Kafka can get into memory. We’re going to have that pull ready to go and we’re gonna pull as fast as we can into FeatureBase. So now when I picked this off, but we’re gonna be looking at is here, this is updating on a five second rate. So these queries are running live right now. So users are running but there’s no table right? So it’s gonna be fairly flat, right? That’s why these are coming back with three milliseconds, right? It’s just the requests are processing. Now, as soon as our tables start to populate. We’ll start to watch these trends go. So we’re going to be watching live ingested live query.

So let’s kick that off. But first thing you’ll notice is it’s going and creating the schema based on the ID and those groups in here, just a moment after all the child routines done spawn, we can start to see them translating and importing those batches. Now immediately, you’ll see at the bottom here, the query transaction time starts to go up. And so again, we’re running 25 users all randomly choosing one of those breeds every second and populating it right now. We’re going through we’re going through I capture the timing where it’s going to run we’ll take just over 66 seconds, and we’ll adjust all 40 million and but again, you can see that so far max time is still coming in around 258 milliseconds. All of them are still taking 258 at a max rate which is just pretty, pretty fantastic for ingesting and running the same cluster.Apache Kafka example demo screenshot Molecula FeatureBase

And you can see that we’ve already started to maintain a steady state. So we’re starting to dip back in as those queries are running.

And we’re just about to finish up here. And you can see the imports, fastest running to timeout. Really the only downside of what we’re doing here is that we’re relying on parallelism to maintain our rates, right. So what we’re going to see is a spike in rate over a million and then it’s going to slowly go down as the consumers exhaust their particular partitions, right, we don’t have it set up to where a consumer can go and grab a different batch if he runs out of his partitions. So we’re going to see a nominal rate and that’s already done.

So that took 68 seconds. And roughly 508,000 average records per second. Remember, as you can see in here as the sort of the timeout we’re losing parallelism, right, so across the 64. So we probably hit a peak rate just around 1.1 ish million at the beginning, and then that pattern slowly degraded as consumers right now. And as you can see, now the table’s populated. These are starting to level out more consistently, because we’re not ingesting into that table at the moment. But our max rates came in around 180 milliseconds, which is the intersections. So this is a complex AND OR  looking through every record in their 40 million record query.

And the averages are well in the 30 milliseconds to 61 milliseconds. Even the COUNT DISTINCTs are coming in at 60 milliseconds, which is a very expensive system so…”

Erica: I may have been yelled out a few times for trying to use count distinct too often in a SQL Server.

Garrett:  “So again, we can be ingesting into a table that already had a few records in it. I could rerun or I could continue to stream into this one but for this exercise, I just wanted to show again at very high rates, sustained rate coming in 40 million records in just to let’s take a quick look at our web UI just for a moment. And in here and refresh. You can see our tables populated if I come in here and do accounts all you can see for 39,399,997 records in that table. So just let you know, in fact, we did ingest that data live. This is the web UI that’s being pinged in this home page query right here.

So that would conclude it, you know, we’ve just ingested 40 million records. Our query times are sustained at a very, very good rate for that number of users on a very small cluster of the sides.”

 

Explore Use Cases Tailored to your Industry