Apache Kafka is a distributed streaming platform. Kafka is used for building real-time data pipelines and streaming apps. Kafka can be used to publish and subscribe to stream of records, similar to a message queue or enterprise messaging system.
Kafka is generally used for two broad classes of applications:
- Building real-time streaming data pipelines that reliably get data between systems or applications
- Building real-time streaming applications that transform or react to the streams of data
Kafka can be run on both Linux and Windows. First thing that you need to do is download Kafka. Latest version is Kafka 2.4.0.
Apache Kafka Installation & Configuration :
Once you have downloaded the zip folder, all you need to do is extract it. Inside the bin folder, you will find a separate Windows folder which will have the executable file for Windows.
In bin/server.properties, make sure to provide a valid directory for logs like the below so that logs can be analyzed for any issue.
Once done, before you start Kafka you will need to start Zookeeper.
Next, start Kafka.
Next thing that you need to do is create a topic to push the messages.
D:\Apache\Kafka\kafka_2.13-2.4.0\bin\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
D:\Apache\Kafka\kafka_2.13-2.4.0\bin\windows>kafka-topics.bat --list --bootstrap-server localhost:9092
test
Once your topic is created, you can start your work with the Producer and Consumer API. To do a basic check you can have 2 terminals open and you can have producer and consumer running separately. Any message you type on the producer window would appear on the consumer window in real time like below.
You can also write a simple code in Java to test the Kafka producer and consumer.
Kafka Producer :
Before testing the Java code for Producer and Consumer, make sure to have a Kafka clients jar added to your build path. Any recent jar should ideally work.
To test the following Kafka producer code, run the code in Eclipse while having a consumer terminal open.
package com.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class kafkaproducer { public static void main(String[] args) { //properties for producer Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); String topic = "test"; int count = 1; //create producer Producer<Integer, String> producer = new KafkaProducer<Integer, String>(props); for (int i=1;i<=10;i++) { String message = "Kafka message : " + i ; ProducerRecord<Integer, String> producerRecord = new ProducerRecord<Integer, String>(topic, count, message); producer.send(producerRecord); } //Close producer producer.close(); } }
Kafka Consumer :
To test the following consumer code, you can have the consumer code running in Eclipse and run a producer terminal. You should see the messages appearing within the Eclipse terminal.
package com.kafka.consumer; import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class kafkaconsumer { public static void main(String[] args) throws Exception { String topic = "test"; String group = "test-consumer-group"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("auto.offset.reset", "latest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); while (true) { consumer.commitAsync(); ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println(record.value()); } } }
That’s it for this tutorial. Have already used Apache Kafka ? Please share your views with us in the comment section below.