Apache Pulsar is a cloud-native, distributed messaging and streaming platform. It was originally created at Yahoo and is now a top-level Apache Software Foundation Project.
In one of my previous post, I wrote about how you can setup Pulsar on Docker.
Here’s how you can create an Apache Pulsar Producer and Consumer using the Pulsar Java Client.
1. Setup a Maven Project for Apache Pulsar
The first thing that you need to do is create a Maven project. Once done, you can add the Pulsar dependency.
<!-- in your <properties> block -->
<pulsar.version>2.8.0</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
You will also need to run Pulsar on Docker before going ahead with the producer and consumer.
$ docker run -it \ -p 6650:6650 \ -p 8080:8080 \ --mount source=pulsardata,target=/pulsar/data \ --mount source=pulsarconf,target=/pulsar/conf \ apachepulsar/pulsar:2.8.0 \ bin/pulsar standalone
2. Create a Pulsar Producer
To create an Apache Pulsar producer in Java, you need to use the Pulsar client. You can use the below code to create and produce a message into a topic.
package com.pulsar.demo.pulsardemo;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
public class PulsarProducer
{
public static void main( String[] args )
{
try {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
stringProducer.send("My message");
stringProducer.close();
}
catch (PulsarClientException e) {
e.printStackTrace();
}
}
}
You can in parallel run the consumer inside the Docker container to confirm the subscription.
$ docker exec -it <container id> /bin/bash
$ cd bin/
$ ./pulsar-client consume my-topic -s "first-subscription"
Warning: Nashorn engine is planned to be removed from a future JDK release
14:25:25.732 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x09a98942, L:/127.0.0.1:54216 - R:localhost/127.0.0.1:6650]] Connected to server
14:25:26.312 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {
"topicNames" : [ "my-topic" ],
"topicsPattern" : null,
"subscriptionName" : "first-subscription",
"subscriptionType" : "Exclusive",
"subscriptionMode" : "Durable",
"receiverQueueSize" : 1000,
"acknowledgementsGroupTimeMicros" : 100000,
"negativeAckRedeliveryDelayMicros" : 60000000,
"maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
"consumerName" : null,
"ackTimeoutMillis" : 0,
"tickDurationMillis" : 1000,
"priorityLevel" : 0,
"maxPendingChunkedMessage" : 10,
"autoAckOldestChunkedMessageOnQueueFull" : false,
"expireTimeOfIncompleteChunkedMessageMillis" : 60000,
"cryptoFailureAction" : "FAIL",
"properties" : { },
"readCompacted" : false,
"subscriptionInitialPosition" : "Latest",
"patternAutoDiscoveryPeriod" : 60,
"regexSubscriptionMode" : "PersistentOnly",
"deadLetterPolicy" : null,
"retryEnable" : false,
"autoUpdatePartitions" : true,
"autoUpdatePartitionsIntervalSeconds" : 60,
"replicateSubscriptionState" : false,
"resetIncludeHead" : false,
"keySharedPolicy" : null,
"batchIndexAckEnabled" : false,
"ackReceiptEnabled" : false,
"poolMessages" : true,
"maxPendingChuckedMessage" : 10
}
14:25:26.354 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
"serviceUrl" : "pulsar://localhost:6650/",
"authPluginClassName" : null,
"authParams" : null,
"authParamMap" : null,
"operationTimeoutMs" : 30000,
"statsIntervalSeconds" : 60,
"numIoThreads" : 1,
"numListenerThreads" : 1,
"connectionsPerBroker" : 1,
"useTcpNoDelay" : true,
"useTls" : false,
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"concurrentLookupRequest" : 5000,
"maxLookupRequest" : 50000,
"maxLookupRedirects" : 20,
"maxNumberOfRejectedRequestPerConnection" : 50,
"keepAliveIntervalSeconds" : 30,
"connectionTimeoutMs" : 10000,
"requestTimeoutMs" : 60000,
"initialBackoffIntervalNanos" : 100000000,
"maxBackoffIntervalNanos" : 60000000000,
"enableBusyWait" : false,
"listenerName" : null,
"useKeyStoreTls" : false,
"sslProvider" : null,
"tlsTrustStoreType" : "JKS",
"tlsTrustStorePath" : "",
"tlsTrustStorePassword" : "",
"tlsCiphers" : [ ],
"tlsProtocols" : [ ],
"memoryLimitBytes" : 0,
"proxyServiceUrl" : null,
"proxyProtocol" : null,
"enableTransaction" : false
}
14:25:26.467 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [my-topic][first-subscription] Subscribing to topic on cnx [id: 0x09a98942, L:/127.0.0.1:54216 - R:localhost/127.0.0.1:6650], consumerId 0
14:25:27.075 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [my-topic][first-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
14:31:27.780 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
----- got message -----
key:[null], properties:[], content:My message
14:31:27.887 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/
14:31:28.067 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [my-topic] [first-subscription] Closed consumer
14:31:28.090 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x09a98942, L:/127.0.0.1:54216 ! R:localhost/127.0.0.1:6650] Disconnected
14:31:30.150 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully consumed
3. Create a Pulsar Consumer
You can the use the below code to create a Pulsar consumer.
package com.pulsar.demo.pulsardemo;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
public class PulsarConsumer {
public static void main(String[] args) {
try {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
while (true) {
Message msg = consumer.receive();
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
}
catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
}
}
catch(PulsarClientException e) {
e.printStackTrace();
}
}
}
In this case, you can produce a message from the Docker container. You can then run the consumer from IDE.
$ ./pulsar-client produce my-topic --messages "hello-pulsar"
Warning: Nashorn engine is planned to be removed from a future JDK release
18:42:26.393 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x1efcd31a, L:/127.0.0.1:55114 - R:localhost/127.0.0.1:6650]] Connected to server
18:42:26.937 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {
"topicName" : "my-topic",
"producerName" : null,
"sendTimeoutMs" : 30000,
"blockIfQueueFull" : false,
"maxPendingMessages" : 1000,
"maxPendingMessagesAcrossPartitions" : 50000,
"messageRoutingMode" : "RoundRobinPartition",
"hashingScheme" : "JavaStringHash",
"cryptoFailureAction" : "FAIL",
"batchingMaxPublishDelayMicros" : 1000,
"batchingPartitionSwitchFrequencyByPublishDelay" : 10,
"batchingMaxMessages" : 1000,
"batchingMaxBytes" : 131072,
"batchingEnabled" : true,
"chunkingEnabled" : false,
"compressionType" : "NONE",
"initialSequenceId" : null,
"autoUpdatePartitions" : true,
"autoUpdatePartitionsIntervalSeconds" : 60,
"multiSchema" : true,
"accessMode" : "Shared",
"properties" : { }
}
18:42:26.985 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: {
"serviceUrl" : "pulsar://localhost:6650/",
"authPluginClassName" : null,
"authParams" : null,
"authParamMap" : null,
"operationTimeoutMs" : 30000,
"statsIntervalSeconds" : 60,
"numIoThreads" : 1,
"numListenerThreads" : 1,
"connectionsPerBroker" : 1,
"useTcpNoDelay" : true,
"useTls" : false,
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"concurrentLookupRequest" : 5000,
"maxLookupRequest" : 50000,
"maxLookupRedirects" : 20,
"maxNumberOfRejectedRequestPerConnection" : 50,
"keepAliveIntervalSeconds" : 30,
"connectionTimeoutMs" : 10000,
"requestTimeoutMs" : 60000,
"initialBackoffIntervalNanos" : 100000000,
"maxBackoffIntervalNanos" : 60000000000,
"enableBusyWait" : false,
"listenerName" : null,
"useKeyStoreTls" : false,
"sslProvider" : null,
"tlsTrustStoreType" : "JKS",
"tlsTrustStorePath" : "",
"tlsTrustStorePassword" : "",
"tlsCiphers" : [ ],
"tlsProtocols" : [ ],
"memoryLimitBytes" : 0,
"proxyServiceUrl" : null,
"proxyProtocol" : null,
"enableTransaction" : false
}
18:42:27.013 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [null] Creating producer on cnx [id: 0x1efcd31a, L:/127.0.0.1:55114 - R:localhost/127.0.0.1:6650]
18:42:27.099 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [standalone-3-2] Created producer on cnx [id: 0x1efcd31a, L:/127.0.0.1:55114 - R:localhost/127.0.0.1:6650]
18:42:27.275 [main] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
18:42:27.394 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/
18:42:27.431 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [standalone-3-2] Closed Producer
18:42:27.474 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x1efcd31a, L:/127.0.0.1:55114 ! R:localhost/127.0.0.1:6650] Disconnected
18:42:29.538 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced

So, that’s it for this article. Thank you for reading and happy coding.
