Morgan Stanley uses Apache Kafka to publish market data internally to it’s clients. They have developed a new C++ API for Kafka and have open sourced it. This basically means a new modern C++ API for Apache Kafka is available to use for developers.
You can also check out the article on the Confluent blog for more details.
Getting started
The easiest way to get started is to download the code from the Github repository. One thing that you need to note here is that 2 different types of API has been developed for Kafka Producer – synchronous and asynchronous.
For example, below is the synchronous kafka producer.
// https://github.com/morganstanley/modern-cpp-kafka/blob/main/examples/kafka_sync_producer.cc #include "kafka/KafkaProducer.h" #include <iostream> #include <string> int main(int argc, char **argv) { if (argc != 3) { std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n"; return 1; } std::string brokers = argv[1]; kafka::Topic topic = argv[2]; // Create configuration object kafka::Properties props({ {"bootstrap.servers", brokers}, {"enable.idempotence", "true"}, }); // Create a producer instance. kafka::KafkaSyncProducer producer(props); // Read messages from stdin and produce to the broker. std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl; for (std::string line; std::getline(std::cin, line);) { // The ProducerRecord doesn't own `line`, it is just a thin wrapper auto record = kafka::ProducerRecord(topic, kafka::NullKey, kafka::Value(line.c_str(), line.size())); // Send the message. try { kafka::Producer::RecordMetadata metadata = producer.send(record); std::cout << "% Message delivered: " << metadata.toString() << std::endl; } catch (const kafka::KafkaException& e) { std::cerr << "% Message delivery failed: " << e.error().message() << std::endl; } if (line.empty()) break; }; // producer.close(); // No explicit close is needed, RAII will take care of it }
The synchronous producer prevents sending of multiple messages concurrently which can cause performance issues in a slower network.
Below is the code for asynchronous kafka producer.
// Create a producer instance. kafka::KafkaAsyncProducer producer(props); // Read messages from stdin and produce to the broker. std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl; for (std::string line; std::getline(std::cin, line);) { // The ProducerRecord doesn't own `line`, it is just a thin wrapper auto record = kafka::ProducerRecord(topic, kafka::NullKey, kafka::Value(line.c_str(), line.size())); // Send the message. producer.send(record, // The delivery report handler [](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) { if (!ec) std::cout << "% Message delivered: " << metadata.toString() << std::endl; else std::cerr << "% Message delivery failed: " << ec.message() << std::endl; }, // The memory block given by record.value() will be copied kafka::KafkaProducer::SendOption::ToCopyRecordValue); if (line.empty()) break; };
Like the producer, below is the code for the consumer.
// https://github.com/morganstanley/modern-cpp-kafka/blob/main/examples/kafka_auto_commit_consumer.cc #include "kafka/KafkaConsumer.h" #include <iostream> #include <stream> int main(int argc, char **argv) { if (argc != 3) { std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n"; return 1; } std::string brokers = argv[1]; kafka::Topic topic = argv[2]; // Create configuration object kafka::Properties props ({ {"bootstrap.servers", brokers}, }); // Create a consumer instance. kafka::KafkaAutoCommitConsumer consumer(props); // Subscribe to topics consumer.subscribe({topic}); // Read messages from the topic. std::cout << "% Reading messages from topic: " << topic << std::endl; while (true) { auto records = consumer.poll(std::chrono::milliseconds(100)); for (const auto& record: records) { // In this example, quit on empty message if (record.value().size() == 0) return 0; if (!record.error()) { std::cout << "% Got a new message..." << std::endl; std::cout << " Topic : " << record.topic() << std::endl; std::cout << " Partition: " << record.partition() << std::endl; std::cout << " Offset : " << record.offset() << std::endl; std::cout << " Timestamp: " << record.timestamp().toString() << std::endl; std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl; std::cout << " Key [" << record.key().toString() << "]" << std::endl; std::cout << " Value [" << record.value().toString() << "]" << std::endl; } else { // Errors are typically informational, thus no special handling is required std::cerr << record.toString() << std::endl; } } } // consumer.close(); // No explicit close is needed, RAII will take care of it }
Below is a summary of the Kafka clients used –
KafkaProducer :
- ProducerRecord: The “message type” for a KafkaProducer to send, constructed with Topic, Partition, Key, Value, and Headers.
- Producer::Callback: The callback method used to provide asynchronous handling of request completion. This method will be called when the record sent to the server has been acknowledged.
- KafkaAsyncProducer: Publishes records to the Kafka cluster asynchronously. Each send operation requires a per-message Producer::Callback.
- KafkaSyncProducer: Publishes records to the Kafka cluster synchronously. The send operation does not return until the delivery is completed.
- Producer::RecordMetadata: The metadata for a record that has been acknowledged by the server. It contains Topic, Partitions, Offset, KeySize, ValueSize, Timestamp, and PersistedStatus. A KafkaAsyncProducer passes this metadata as an input parameter of the Producer::Callback. KafkaSyncProducer returns the metadata with the synchronized send method.
KafkaConsumer :
- ConsumerRecord: The message type returned by a KafkaConsumer instance. It contains Topic, Partition, Offset, Key, Value, Timestamp, and Headers.
- KafkaAutoCommitConsumer: Automatically commits previously polled offsets on each poll operation.
- KafkaManualCommitConsumer: Provides manual commitAsync and commitSync methods to acknowledge messages.
The modern-cpp-kafka project is actively maintained. So, if you would like to contribute or report a bug you can do so.