Confluent platform provides an easy way to create a Kafka Streaming Application. If you’re just getting started with the Confluent platform, you can check out one of my earlier post on how to set it up.
Here’s what we’re going to do for this article –
- Create a simple MySQL DB table which will have some record.
- Create a materialized value from MySQL in ksqlDB using the Kafka Connect JDBC plugin. The data from DB gets written into a topic.
- Create a stream from the DB topic.
- Create a stream from an input topic which will have external data coming in.
- Query the 2 joined streams to create enriched data
1. Setup MySQL
If you’re using Ubuntu like me, you can quickly setup a MySQL DB table like the below.
mysql> select * from customers; +--------+----------+ | custid | custname | +--------+----------+ | 1 | Sam | | 2 | Dean | | 3 | Crowley | | 4 | Kevin | | 5 | Lucifer | | 6 | Michael | | 7 | John | | 8 | Gabriel | | 9 | Metatron | | 10 | Chuck | +--------+----------+
2. Setup Kafka Connect JDBC Plugin
The first thing that you need to do is download the zip folder from Confluent Hub. Once you have downloaded the zip, you can create a kafka/plugins directory in your Confluent share/ path. Move all the contents of the plugin folder into the newly created path.
Make changes to the connect-distributed.properties.
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/usr/share/java,<confluent-path>/share/kafka/plugins/
Start the connect cluster using the below command. Also, make sure to download the mysql connector jar in the lib path before starting connect.
$ cd <confluent-path> $ bin/connect-distributed etc/kafka/connect-distributed.properties &
Make sure to have the configuration for Connect, ksql and Schema Registry enabled in control-center.properties.
# A comma separated list of Connect host names confluent.controlcenter.connect.<connect-cluster-name>.cluster=http://localhost:8083 # KSQL cluster URL confluent.controlcenter.ksql.<ksql-cluster-name>.url=http://localhost:8088 # Schema Registry cluster URL confluent.controlcenter.schema.registry.url=http://localhost:8081
Start control center using the below command.
$ bin/control-center-start etc/confluent-control-center/control-center.properties &
Once the control center, you should see something like the below.
3. Configure JdbcSourceConnector and create a stream from MySQL DB topic
You can configure the source connector with the below settings.
{ "name": "JdbcSourceConnectorConnector_0", "config": { "name": "JdbcSourceConnectorConnector_0", "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://localhost:3306/<dbname>", "connection.user": "<username>", "connection.password": "********", "dialect.name": "MySqlDatabaseDialect", "mode": "incrementing", "incrementing.column.name": "custid", "topic.prefix": "mysql_" } }
This setting would insert data into topic whenever a new record gets inserted into the table.
To confirm if all the data from MySQL is present in your Kafka topic, you can use the below command.
$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic mysql_customers --from-beginning {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"custid"},{"type":"string","optional":true,"field":"custname"}],"optional":false,"name":"customers"},"payload":{"custid":1,"custname":"Sam"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"custid"},{"type":"string","optional":true,"field":"custname"}],"optional":false,"name":"customers"},"payload":{"custid":2,"custname":"Dean"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"custid"},{"type":"string","optional":true,"field":"custname"}],"optional":false,"name":"customers"},"payload":{"custid":3,"custname":"Crowley"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"custid"},{"type":"string","optional":true,"field":"custname"}],"optional":false,"name":"customers"},"payload":{"custid":4,"custname":"Kevin"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"custid"},{"type":"string","optional":true,"field":"custname"}],"optional":false,"name":"customers"},"payload":{"custid":5,"custname":"Lucifer"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"custid"},{"type":"string","optional":true,"field":"custname"}],"optional":false,"name":"customers"},"payload":{"custid":6,"custname":"Michael"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"custid"},{"type":"string","optional":true,"field":"custname"}],"optional":false,"name":"customers"},"payload":{"custid":7,"custname":"John"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"custid"},{"type":"string","optional":true,"field":"custname"}],"optional":false,"name":"customers"},"payload":{"custid":8,"custname":"Gabriel"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"custid"},{"type":"string","optional":true,"field":"custname"}],"optional":false,"name":"customers"},"payload":{"custid":9,"custname":"Metatron"}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"custid"},{"type":"string","optional":true,"field":"custname"}],"optional":false,"name":"customers"},"payload":{"custid":10,"custname":"Chuck"}}
Once you have verified the data in the topic, you need to create a stream. Note that we only need to use the payload portion of JSON below.
{ "schema":{ "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"custid" }, { "type":"string", "optional":true, "field":"custname" } ], "optional":false, "name":"customers" }, "payload":{ "custid":1, "custname":"Sam" } }
You can use the below statement.
ksql> CREATE STREAM customers ("payload" MAP < VARCHAR, VARCHAR >) WITH (KAFKA_TOPIC = 'mysql_customers', VALUE_FORMAT = 'JSON'); Message ---------------- Stream created ----------------
Once stream is created, you can query it using below. At the same time, you can insert some values in MySQL.
mysql> insert into customers values (23,"Thomas"); Query OK, 1 row affected (0.13 sec) mysql> insert into customers values (24,"Harvey"); Query OK, 1 row affected (0.20 sec) mysql> insert into customers values (25,"Mike"); Query OK, 1 row affected (0.14 sec) mysql> insert into customers values (26,"Benny"); Query OK, 1 row affected (0.12 sec) mysql> insert into customers values (27,"Louis"); Query OK, 1 row affected (0.13 sec)
ksql> select "payload"['custid'] as custid,"payload"['custname'] as custname from customers emit changes; +-------+----------+ |CUSTID | CUSTNAME | +-------+----------+ |23 |Thomas | |24 |Harvey | |25 |Mike | |26 |Benny | |27 |Louis |
4. Create a topic for external data
Next you can create a separate topic for an incoming external data like say transaction details with custid.
ksql> CREATE STREAM TRANSACTIONS (CUSTID STRING, AMOUNT STRING) WITH (KAFKA_TOPIC='transactions', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON'); Message ---------------- Stream created ----------------
Now run a query against the stream and at the same time produce some sample data into the topic.
$ bin/kafka-console-producer --bootstrap-server localhost:9092 --topic transactions >{"custid":"8","amount":"4:00"} >{"custid":"5","amount":"4:00"}
ksql> select * from transactions emit changes; +-------+-------+ |CUSTID |AMOUNT | +-------+-------+ |8 |4:00 | |5 |4:00 |
5. Create enriched data
Now that you have 2 streams, you can run a query which will have enriched data with custid, custname and transaction amount.
You can produce a sample record to check if the stream join is working.
sethlahaul@ls:/apps/Confluent/confluent-6.2.0$ bin/kafka-console-producer --bootstrap-server localhost:9092 --topic transactions >{"custid":"32","amount":"4:00"}
ksql> SELECT > CU.`payload`['custid'] CUSTID, > CU.`payload`['custname'] CUSTNAME, > TR.AMOUNT AMOUNT >FROM CUSTOMERS CU >INNER JOIN TRANSACTIONS TR WITHIN 7 DAYS ON ((CU.`payload`['custid'] = TR.CUSTID)) >EMIT CHANGES; +------+--------+------+ |CUSTID|CUSTNAME|AMOUNT| +------+--------+------+ |32 |Jack |4:00 |
So, that’s it for this article. Stay tuned for more.
Thank you for reading and happy event streaming.