Interfacing with Apache Kafka from your applications is done through readily available client libraries available for every major programming language. For streaming data between external systems and your applications, such as databases, you may be tempted to write your solutions to move them into Kafka and later consume them. However, this approach is error-prone and not scalable.
In this tutorial, you’ll learn how to ingest data into Kafka topics using Kafka Connect, a tool used for reliably transferring data between Kafka and other systems (such as filesystems and databases). You’ll also learn to stream data from Kafka into Elasticsearch for later indexing. Contrary to a manual approach, Kafka Connect is proven to automatically track the progress of data migrations and readily access many different data systems.
To complete this tutorial, you’ll need:
In this step, you’ll learn how to configure Kafka Connect in standalone mode to watch over a file on the host filesystem and stream changes to Kafka.
As part of the prerequisites, you installed Kafka under ~/kafka
. Kafka Connect comes bundled with the default Kafka release, so you don’t have to install it separately. Navigate to that directory by running:
- cd ~/kafka
To facilitate data retrieval from various sources, Kafka Connect uses connectors to retrieve it. Connectors are ready-to-use libraries that integrate with Kafka Connect and provide access to external systems, such as filesystems and databases. A number of common connectors are already bundled with Kafka, and many more are available and come with permissive licensing.
Aside from connectors, Kafka Connect discerns between sources and sinks. A source ingests data into Kafka through a connector, while a sink exports the data from Kafka into an external system.
You’ll store the configuration for the source in a file called file-source.properties
. Create and open it for editing by running:
- nano file-source.properties
Add the following lines:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt
This source is named local-file-source
and uses the FileStreamSource
class with just one instance as per tasks.max
. The data will be appended to the connect-test
topic in Kafka, while the file which will be monitored is test.txt
. Save and close the file.
Now that you’ve defined a source, you’ll create a sink that will stream the data into a separate textual file. Create and open file-sink.properties
for editing:
- nano file-sink.properties
Insert the following lines:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
topics=connect-test
file=test-sink.txt
Similar to what you did with file-source.properties
, you define a sink by specifying FileStreamSink
as the used class, with one instance. You set connect-test
as the topic it should read from and test-sink.txt
as the output file. When you’re done, save and close the file.
Next, you’ll define the configuration for Kafka Connect itself. Create a file for it by running the following:
- nano connect.properties
Add the following lines:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=libs/
Here, you first specify the address of the Kafka server (bootstrap.servers
). Then, you set the JsonConverter
class as the converter for both keys and values of produced messages, meaning that the JSON will be inserted into Kafka. However, you disable schema verification for both keys and values, as there is no need to have a schema for the contents of the textual file.
Kafka Connect tracks and manages progress on its own, and it needs a place to store the internal offsets. You provide a path for that in offset.storage.file.filename
and also set the offset commit interval to 10s
. Lastly, you set the path where connector libraries are stored to libs/
, referring to the directory under ~/kafka
.
Save and close the file when you’re done. You now have the source, sink, and Kafka Connect configuration defined. Before running it, create and open the test.txt
file for editing.
- nano test.txt
Add the following lines:
Hello World!
Second Hello World!
You can add additional lines if you wish. Save and close the file when you’re done.
Finally, run Kafka Connect in the standalone mode with the following command:
- bin/connect-standalone.sh connect.properties file-source.properties file-sink.properties
In this mode, Kafka Connect accepts a source and a sink from files on disk, which is helpful for testing. On the contrary, the distributed mode accepts them only through an HTTP interface and can thus be controlled remotely.
There will be a lot of output signifying that Connect started monitoring test.txt
and connected to the cluster. In a separate terminal, run the provided kafka-console-consumer.sh
script to read all messages from the connect-test
topic:
- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
The output will be:
OutputHello World!
Second Hello World!
Kafka Connect has streamed the contents of test.txt
to the connect-test
topic.
In a third terminal session, run the following command to append a line to test.txt
:
- echo "Third Hello World!" >> test.txt
Watch the output of kafka-console-consumer.sh
in the second terminal. You’ll see the third message being received:
OutputHello World!
Second Hello World!
Third Hello World!
Show the contents of test-sink.txt
to verify that the same data has been streamed into that file through the local-file-sink
:
- cat test-sink.txt
The output will be the same:
OutputHello World!
Second Hello World!
Third Hello World!
Kafka Connect watches over the file and automatically propagates the changes. You can exit both Kafka Connect and the consumer script by pressing CTRL + C
.
In this step, you’ve ingested contents of a textual file on the host filesystem into a Kafka topic and verified that it’s been received. You’ll now learn how to ingest data from a MySQL database.
In this step, you’ll set up a sample MySQL database and learn how to ingest data from it into Kafka. You’ll run Kafka Connect in distributed mode and will install the Debezium connector for Kafka to be able to connect to the database.
Enter the MySQL console by running:
- sudo mysql
Once you’re in, create a database called employees
:
CREATE DATABASE employees;
The output will be:
OutputQuery OK, 1 row affected (0.00 sec)
Then, switch to the created database:
USE employees;
Finally, create a schema for storing employee data:
CREATE TABLE `employee` (
`Id` int NOT NULL AUTO_INCREMENT,
`Name` varchar(45) NOT NULL,
`Surname` varchar(45) DEFAULT NULL,
PRIMARY KEY (`Id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
This will create the employee
table with an ID, name, and surname columns. The output will look like this:
OutputQuery OK, 0 rows affected (0.03 sec)
You can now insert sample data by running:
INSERT INTO `employees`.`employee` (`Name`, `Surname`) VALUES ('John', 'Smith');
INSERT INTO `employees`.`employee` (`Name`, `Surname`) VALUES ('Mark', 'Twain');
The database will notify that two rows are changed:
OutputQuery OK, 2 rows affected (0.01 sec)
You now have a sample database holding employee names. You can now create a user to access this database:
CREATE USER 'kafkaconnect'@'localhost' IDENTIFIED BY 'password';
Its username will be kafkaconnect
with password
as password.
Then, grant it the necessary permissions:
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'kafkaconnect'@'localhost';
Finally, exit out of MySQL:
exit
You now have a user and a database from which you’ll stream the data into Kafka.
As Kafka does not ship with a MySQL connector, you’ll have to install an additional plugin. In this subsection, you’ll download and set up the Debezium MySQL Connector for Kafka.
Use this command to download the release archive from the official Downloads page and place it under /tmp
:
- curl -o /tmp/debezium.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.6.1.Final/debezium-connector-mysql-2.6.1.Final-plugin.tar.gz
At the time of writing, the latest available version was 2.6.1
. You can grab the latest link from the official page under the MySQL Connector plugin archive download link.
Then, extract it to ~/kafka/libs
by running:
- tar -xzf /tmp/debezium.tar.gz -C ~/kafka/libs/
You’ve now made the Debezium MySQL Connector available to Kafka Connect.
Next, you’ll create a Kafka Connect source to observe the database. You’ll store it in a file called mysql-source-connector.json
. Create and open it for editing by running:
- nano mysql-source-connector.json
Add the following lines:
{
"name": "employees-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "kafkaconnect",
"database.password": "password",
"database.server.id": "1",
"topic.prefix": "dbhistory",
"database.include.list": "employees",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.employees"
}
}
Here, you first specify the Debezium MySqlConnector
class as the connector class and provide the connection data for the database. Then, you specify the address of the Kafka server (database.history.kafka.bootstrap.servers
) and set the topic prefix to dbhistory
. Debezium will create a separate topic with this prefix for each table listed in database.include.list
.
Save and close the file when you’re done. You’ll now run Kafka Connect and execute the connector.
Contrary to standalone mode, in distributed mode, Kafka Connect accepts workloads through HTTP requests. This allows it to run in the background as a system service and to be configurable remotely.
Kafka comes bundled with a configuration file for this mode, named connect-distributed.properties
, which is stored under config/.
You’ll use it to start Kafka Connect, but you’ll first need to update it to use the Debezium Connector. Open it for editing by running:
- nano config/connect-distributed.properties
At the end of the file, find the plugin.path
setting:
...
#plugin.path=
Modify it to look like this:
...
plugin.path=libs/
When you’re done, save and close the file.
You can now run Kafka Connect in distributed mode:
- bin/connect-distributed.sh config/connect-distributed.properties
It can now be accessed at http://localhost:8083
.
In a secondary terminal, run the following command to submit the employees-connector
you’ve defined:
- curl -d @"mysql-source-connector.json" -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
With this command, you submit the contents of mysql-source-connector.json
to Kafka Connect. The output will be:
Output{"name":"employees-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"localhost","database.port":"3306","database.user":"kafkaconnect","database.password":"password","database.server.id":"1","topic.prefix":"dbhistory","database.include.list":"employees","schema.history.internal.kafka.bootstrap.servers":"localhost:9092","schema.history.internal.kafka.topic":"schema-changes.employees","name":"employees-connector"},"tasks":[],"type":"source"}
Kafka Connect will immediately execute the connector and start ingesting data. Each table in all of the listed databases will have a separate topic created for it. You can stream the progress for the employees
database in real-time using kafka-console-consumer.sh
:
- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbhistory.employees.employee --from-beginning
The highlighted topic name consists of the specified topic prefix (dbhistory
), database name (employees
) and table name (employee
). The output will detail the two rows similar to this:
Output{
"schema": {
...
"payload": {
"before": null,
"after": {
"Id": 3,
"Name": "John",
"Surname": "Smith"
},
"source": {
"version": "2.6.1.Final",
"connector": "mysql",
"name": "dbhistory",
"ts_ms": 1713183316000,
"snapshot": "last",
"db": "employees",
"sequence": null,
"ts_us": 1713183316000000,
"ts_ns": 1713183316000000000,
"table": "employee",
"server_id": 0,
"gtid": null,
"file": "binlog.000004",
"pos": 3344,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1713183316537,
"ts_us": 1713183316537164,
"ts_ns": 1713183316537164000,
"transaction": null
}
}
},
{
"schema": {
...
"payload": {
"before": null,
"after": {
"Id": 4,
"Name": "Mark",
"Surname": "Twain"
},
"source": {
"version": "2.6.1.Final",
"connector": "mysql",
"name": "dbhistory",
"ts_ms": 1713183316000,
"snapshot": "last",
"db": "employees",
"sequence": null,
"ts_us": 1713183316000000,
"ts_ns": 1713183316000000000,
"table": "employee",
"server_id": 0,
"gtid": null,
"file": "binlog.000004",
"pos": 3344,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1713183316537,
"ts_us": 1713183316537164,
"ts_ns": 1713183316537164000,
"transaction": null
}
}
}
You’ll now insert an additional row into the employee
table. In a third terminal session, enter the MySQL console by running:
- sudo mysql
Switch to the employees
database:
USE employees;
Then, insert a new row by running:
INSERT INTO `employees`.`employees` (`Name`, `Surname`) VALUES ('George', 'Martin');
It will be streamed to the end of the output:
Output{
"schema": {
...
"payload": {
"before": null,
"after": {
"Id": 5,
"Name": "George",
"Surname": "Martin"
},
"source": {
"version": "2.6.1.Final",
"connector": "mysql",
"name": "dbhistory2",
"ts_ms": 1713183573000,
"snapshot": "false",
"db": "employees",
"sequence": null,
"ts_us": 1713183573000000,
"ts_ns": 1713183573000000000,
"table": "employee",
"server_id": 1,
"gtid": null,
"file": "binlog.000004",
"pos": 3573,
"row": 0,
"thread": 64,
"query": null
},
"op": "c",
"ts_ms": 1713183573029,
"ts_us": 1713183573029781,
"ts_ns": 1713183573029781000,
"transaction": null
}
}
}
When you’re done, press CTRL + C
on the respective terminals to close both Kafka Connect and the console consumer.
In this step, you’ve set up the Debezium MySQL connector for Kafka Connect. You’ve also configured and run Kafka Connect in distributed mode and added a MySQL source connector to sync the database and Kafka. You’ll now learn how to export data from Kafka into Elasticsearch.
In this step, you’ll download and compile the Confluent Elasticsearch connector for Kafka Connect. Then, you’ll create a sink for exporting data from Kafka into Elasticsearch, which utilizes it.
Confluent provides a connector for Elasticsearch at their official GitHub repository. First, clone it by running:
- git clone https://github.com/confluentinc/kafka-connect-elasticsearch.git
Navigate to it:
- cd kafka-connect-elasticsearch
Then, instruct Maven to package it for distribution by running the following command, taking care to skip running unnecessary tests:
- mvn package -Dmaven.test.skip=true
Once it finishes, the end of the output will look similar to this:
Output[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 10:35 min (Wall Clock)
[INFO] Finished at: 2024-04-14T14:41:12Z
[INFO] ------------------------------------------------------------------------
The compiled library and other necessary dependencies are now available under target/components/packages
. As in the previous step, to make the plugin available, you’ll need to add this path to the Kafka Connect configuration. Move out of the directory:
- cd ..
Then, open connect-distributed.properties
for editing:
- nano config/connect-distributed.properties
Navigate to the end of the file and find the plugin.path
line:
...
plugin.path=libs/
Append the new path to the line:
...
plugin.path=libs,kafka-connect-elasticsearch/target/components/packages
Save and close the file.
You’ll now define the Elasticsearch sink and store it in a file named elasticsearch-sink-connector.json
. Create and open it for editing:
- nano elasticsearch-sink-connector.json
Add the following lines:
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "dbhistory.employees.employee",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect",
"name": "elasticsearch-sink"
}
}
Here, you define a connector that will use the ElasticsearchSinkConnector
class with one task. Then, you set the topics
that should be monitored, ignore their keys since you’re only interested in values, and set the connection URL for Elasticsearch.
Save and close the file when you’re done.
Then, start Kafka Connect in distributed mode:
- bin/connect-distributed.sh config/connect-distributed.properties
In the second terminal, add it to Kafka Connect by running:
- curl -d @"elasticsearch-sink-connector.json" -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
Notice that Kafka Connect is starting to ingest the data:
Output...
[2024-04-15 10:43:24,518] INFO [elasticsearch-sink|task-0] Creating index dbhistory.employees.employee. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:227)
...
Then, query Elasticsearch to see it:
- curl -X GET 'http://localhost:9200/dbhistory.employees.employee/_search?pretty'
You’ll see that the three rows from the database are present in the index as events:
Output{
"took": 2,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "dbhistory.employees.employee",
"_id": "dbhistory.employees.employee+0+1",
"_score": 1,
"_source": {
"before": null,
"after": {
"Id": 4,
"Name": "Mark",
"Surname": "Twain"
},
...
}
},
{
"_index": "dbhistory.employees.employee",
"_id": "dbhistory.employees.employee+0+0",
"_score": 1,
"_source": {
"before": null,
"after": {
"Id": 3,
"Name": "John",
"Surname": "Smith"
},
...
}
},
{
"_index": "dbhistory.employees.employee",
"_id": "dbhistory.employees.employee+0+2",
"_score": 1,
"_source": {
"before": null,
"after": {
"Id": 5,
"Name": "George",
"Surname": "Martin"
},
...
}
}
]
}
}
To verify that the flow between sources and sinks works properly, you’ll add another row to the employee
table. In the third terminal session, enter the MySQL console by running:
- sudo mysql
Switch to the employees
database:
USE employees;
Then, insert the fourth row by running:
INSERT INTO `employees`.`employees` (`Name`, `Surname`) VALUES ('Robert', 'Jordan');
Query Elasticsearch again, and you’ll see that it has been picked up:
Output...
{
"_index": "dbhistory.employees.employee",
"_id": "dbhistory.employees.employee+0+3",
"_score": 1,
"_source": {
"before": null,
"after": {
"Id": 6,
"Name": "Robert",
"Surname": "Jordan"
},
...
}
}
...
In this step, you’ve downloaded and compiled the Confluent Elasticsearch connector. You’ve made it available to Kafka Connect and created a sink for exporting data from Kafka to Elasticsearch indexes. Then, you’ve verified that the flow between the database, Kafka, and Elasticsearch is working properly with minimal latency.
In this article, you’ve utilized Kafka Connect to import data from the filesystem and your MySQL database into Kafka. You’ve also learned how to compile and import custom plugins and export data from Kafka to Elasticsearch for later indexing.
The author selected Apache Software Foundation to receive a donation as part of the Write for DOnations program.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!
Sign up for Infrastructure as a Newsletter.
Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.
Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.