Apache Pulsar is a cloud-native, distributed messaging and streaming platform. It was originally created at Yahoo and is now a top-level Apache Software Foundation Project.
In one of my previous post, I wrote about how you can setup Pulsar on Docker.
Here’s how you can create an Apache Pulsar Producer and Consumer using the Pulsar Java Client.
1. Setup a Maven Project for Apache Pulsar
The first thing that you need to do is create a Maven project. Once done, you can add the Pulsar dependency.
<!-- in your <properties> block --> <pulsar.version>2.8.0</pulsar.version> <!-- in your <dependencies> block --> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>${pulsar.version}</version> </dependency>
You will also need to run Pulsar on Docker before going ahead with the producer and consumer.
$ docker run -it \ -p 6650:6650 \ -p 8080:8080 \ --mount source=pulsardata,target=/pulsar/data \ --mount source=pulsarconf,target=/pulsar/conf \ apachepulsar/pulsar:2.8.0 \ bin/pulsar standalone
2. Create a Pulsar Producer
To create an Apache Pulsar producer in Java, you need to use the Pulsar client. You can use the below code to create and produce a message into a topic.
package com.pulsar.demo.pulsardemo; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; public class PulsarProducer { public static void main( String[] args ) { try { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); Producer<String> stringProducer = client.newProducer(Schema.STRING) .topic("my-topic") .create(); stringProducer.send("My message"); stringProducer.close(); } catch (PulsarClientException e) { e.printStackTrace(); } } }
You can in parallel run the consumer inside the Docker container to confirm the subscription.
$ docker exec -it <container id> /bin/bash $ cd bin/ $ ./pulsar-client consume my-topic -s "first-subscription" Warning: Nashorn engine is planned to be removed from a future JDK release 14:25:25.732 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x09a98942, L:/127.0.0.1:54216 - R:localhost/127.0.0.1:6650]] Connected to server 14:25:26.312 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: { "topicNames" : [ "my-topic" ], "topicsPattern" : null, "subscriptionName" : "first-subscription", "subscriptionType" : "Exclusive", "subscriptionMode" : "Durable", "receiverQueueSize" : 1000, "acknowledgementsGroupTimeMicros" : 100000, "negativeAckRedeliveryDelayMicros" : 60000000, "maxTotalReceiverQueueSizeAcrossPartitions" : 50000, "consumerName" : null, "ackTimeoutMillis" : 0, "tickDurationMillis" : 1000, "priorityLevel" : 0, "maxPendingChunkedMessage" : 10, "autoAckOldestChunkedMessageOnQueueFull" : false, "expireTimeOfIncompleteChunkedMessageMillis" : 60000, "cryptoFailureAction" : "FAIL", "properties" : { }, "readCompacted" : false, "subscriptionInitialPosition" : "Latest", "patternAutoDiscoveryPeriod" : 60, "regexSubscriptionMode" : "PersistentOnly", "deadLetterPolicy" : null, "retryEnable" : false, "autoUpdatePartitions" : true, "autoUpdatePartitionsIntervalSeconds" : 60, "replicateSubscriptionState" : false, "resetIncludeHead" : false, "keySharedPolicy" : null, "batchIndexAckEnabled" : false, "ackReceiptEnabled" : false, "poolMessages" : true, "maxPendingChuckedMessage" : 10 } 14:25:26.354 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: { "serviceUrl" : "pulsar://localhost:6650/", "authPluginClassName" : null, "authParams" : null, "authParamMap" : null, "operationTimeoutMs" : 30000, "statsIntervalSeconds" : 60, "numIoThreads" : 1, "numListenerThreads" : 1, "connectionsPerBroker" : 1, "useTcpNoDelay" : true, "useTls" : false, "tlsTrustCertsFilePath" : "", "tlsAllowInsecureConnection" : false, "tlsHostnameVerificationEnable" : false, "concurrentLookupRequest" : 5000, "maxLookupRequest" : 50000, "maxLookupRedirects" : 20, "maxNumberOfRejectedRequestPerConnection" : 50, "keepAliveIntervalSeconds" : 30, "connectionTimeoutMs" : 10000, "requestTimeoutMs" : 60000, "initialBackoffIntervalNanos" : 100000000, "maxBackoffIntervalNanos" : 60000000000, "enableBusyWait" : false, "listenerName" : null, "useKeyStoreTls" : false, "sslProvider" : null, "tlsTrustStoreType" : "JKS", "tlsTrustStorePath" : "", "tlsTrustStorePassword" : "", "tlsCiphers" : [ ], "tlsProtocols" : [ ], "memoryLimitBytes" : 0, "proxyServiceUrl" : null, "proxyProtocol" : null, "enableTransaction" : false } 14:25:26.467 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [my-topic][first-subscription] Subscribing to topic on cnx [id: 0x09a98942, L:/127.0.0.1:54216 - R:localhost/127.0.0.1:6650], consumerId 0 14:25:27.075 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [my-topic][first-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0 14:31:27.780 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized ----- got message ----- key:[null], properties:[], content:My message 14:31:27.887 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/ 14:31:28.067 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [my-topic] [first-subscription] Closed consumer 14:31:28.090 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x09a98942, L:/127.0.0.1:54216 ! R:localhost/127.0.0.1:6650] Disconnected 14:31:30.150 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully consumed
3. Create a Pulsar Consumer
You can the use the below code to create a Pulsar consumer.
package com.pulsar.demo.pulsardemo; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; public class PulsarConsumer { public static void main(String[] args) { try { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); Consumer consumer = client.newConsumer() .topic("my-topic") .subscriptionName("my-subscription") .subscribe(); while (true) { Message msg = consumer.receive(); try { System.out.println("Message received: " + new String(msg.getData())); consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); } } } catch(PulsarClientException e) { e.printStackTrace(); } } }
In this case, you can produce a message from the Docker container. You can then run the consumer from IDE.
$ ./pulsar-client produce my-topic --messages "hello-pulsar" Warning: Nashorn engine is planned to be removed from a future JDK release 18:42:26.393 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x1efcd31a, L:/127.0.0.1:55114 - R:localhost/127.0.0.1:6650]] Connected to server 18:42:26.937 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: { "topicName" : "my-topic", "producerName" : null, "sendTimeoutMs" : 30000, "blockIfQueueFull" : false, "maxPendingMessages" : 1000, "maxPendingMessagesAcrossPartitions" : 50000, "messageRoutingMode" : "RoundRobinPartition", "hashingScheme" : "JavaStringHash", "cryptoFailureAction" : "FAIL", "batchingMaxPublishDelayMicros" : 1000, "batchingPartitionSwitchFrequencyByPublishDelay" : 10, "batchingMaxMessages" : 1000, "batchingMaxBytes" : 131072, "batchingEnabled" : true, "chunkingEnabled" : false, "compressionType" : "NONE", "initialSequenceId" : null, "autoUpdatePartitions" : true, "autoUpdatePartitionsIntervalSeconds" : 60, "multiSchema" : true, "accessMode" : "Shared", "properties" : { } } 18:42:26.985 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: { "serviceUrl" : "pulsar://localhost:6650/", "authPluginClassName" : null, "authParams" : null, "authParamMap" : null, "operationTimeoutMs" : 30000, "statsIntervalSeconds" : 60, "numIoThreads" : 1, "numListenerThreads" : 1, "connectionsPerBroker" : 1, "useTcpNoDelay" : true, "useTls" : false, "tlsTrustCertsFilePath" : "", "tlsAllowInsecureConnection" : false, "tlsHostnameVerificationEnable" : false, "concurrentLookupRequest" : 5000, "maxLookupRequest" : 50000, "maxLookupRedirects" : 20, "maxNumberOfRejectedRequestPerConnection" : 50, "keepAliveIntervalSeconds" : 30, "connectionTimeoutMs" : 10000, "requestTimeoutMs" : 60000, "initialBackoffIntervalNanos" : 100000000, "maxBackoffIntervalNanos" : 60000000000, "enableBusyWait" : false, "listenerName" : null, "useKeyStoreTls" : false, "sslProvider" : null, "tlsTrustStoreType" : "JKS", "tlsTrustStorePath" : "", "tlsTrustStorePassword" : "", "tlsCiphers" : [ ], "tlsProtocols" : [ ], "memoryLimitBytes" : 0, "proxyServiceUrl" : null, "proxyProtocol" : null, "enableTransaction" : false } 18:42:27.013 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [null] Creating producer on cnx [id: 0x1efcd31a, L:/127.0.0.1:55114 - R:localhost/127.0.0.1:6650] 18:42:27.099 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [standalone-3-2] Created producer on cnx [id: 0x1efcd31a, L:/127.0.0.1:55114 - R:localhost/127.0.0.1:6650] 18:42:27.275 [main] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized 18:42:27.394 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/ 18:42:27.431 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [standalone-3-2] Closed Producer 18:42:27.474 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x1efcd31a, L:/127.0.0.1:55114 ! R:localhost/127.0.0.1:6650] Disconnected 18:42:29.538 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
So, that’s it for this article. Thank you for reading and happy coding.