Ingesting Time-Series Events from Kafka into QuestDB

If you are working on a project with fast or streaming data, chances are Apache Kafka is already part of your pipeline.

But if you want to analyse your data, you will need to ingest from Kafka into some destination.

For time-series data, QuestDB can be an excellent choice.

For those of you who are not yet familiar with QuestDB, it is an Apache 2.0 licensed database designed for high throughput ingestion and fast SQL queries.

You could write your own application, or use the generic JDBC Kafka connector to consume from Kafka and ingest into QuestDB, but the most convenient option is to use the native QuestDB Kafka Connector.

Let’s see how the integration works.

Requirements

Make sure you already have:

A Kafka installation
A running QuestDB (for example, docker run –add-host=host.docker.internal:host-gateway -p 9000:9000 -p 9009:9009 -p 8812:8812 -p 9003:9003 questdb/questdb:latest)
A local JDK installation

Adding the QuestDB Sink Connector to Kafka

The Apache Kafka distribution includes the Kafka Connect framework, but the QuestDB-specific zip file must be downloaded from the QuestDB Kafka connector GH page.

If you prefer it, the connector is also available via the Confluent Hub.

Once downloaded you need to unzip it and move it into your Kafka libs directory. For example:

unzip kafka-questdb-connector-*-bin.zip
cd kafka-questdb-connector
cp ./*.jar /path/to/kafka_2.13-2.6.0/libs

Configuration

The connector reads messages from a topic in Kafka, and writes them to a table in QuestDB using the ILP protocol, so the minimum configuration you need is the topic name, QuestDB host and port, and the table name.

There are some extra options you could configure for serialisation and authentication.

A configuration file /path/to/kafka/config/questdb-connector.properties must be created for Kafka Connect in the standalone mode. A basic config file could look like this:

name=questdb-sink
connector.class=io.questdb.kafka.QuestDBSinkConnector
host=localhost:9009
topics=example-topic
table=example_table
include.key=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter

Start Kafka

Go to the Kafka home directory and start Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

Then start Kafka itself

bin/kafka-server-start.sh config/server.properties

And let’s start the QuestDB connector, pointing to the config file we just created

bin/connect-standalone.sh config/connect-standalone.properties config/questdb-connector.properties

Publish some events

We are going to start and interactive Kafka console producer, so you can post some messages into a topic. Just make sure the topic name is the one we have in the configuration file.

bin/kafka-console-producer.sh –topic example-topic –bootstrap-server localhost:9092

When the shell starts, you are ready to send JSON messages. You might have noticed that we didn’t define a table structure for our output.

QuestDB will automatically create the table from the first message we insert, and if we add new fields in later messages, the table will be updated automatically.

In any case, even though QuestDB offers some flexibility with automatic schemas, it is not a schemaless database, so all the messages must have a compatible structure.

As an example, paste this JSON as a single line into the Kafka producer shell and hit enter:

{“firstname”: “Arthur”, “lastname”: “Dent”, “age”: 42}

You can send one or two more messages, maybe add an extra field to one of them just for fun.

Note: If you preferred, you could have created your table beforehand in QuestDB issuing a CREATE TABLE statement

Querying your time-series data

If all went well, your data has already been stored in QuestDB and can be queried using SQL either via a Postgresql-compatible library, or the handy REST API.

Or you can simply go to the QuestDB web console, which runs by default at http://localhost:9000.

Run the following SQL query and you should see as many rows as messages you sent to your Kafka topic.

Make sure the table name matches the one you used in the configuration file.

SELECT * FROM example_table

That’s a wrap

As you can see, consuming messages from Kafka and into QuestDB is quite painless using the built-in connector.

For more complete examples, including Change Data Capture from Postgresql into QuestDB using Kafka and Debezium, check out the official repository.

The post Ingesting Time-Series Events from Kafka into QuestDB appeared first on foojay.