Apache Pulsar is a cloud-native, distributed messaging and streaming platform. It was originally created at Yahoo and is now a top-level Apache Software Foundation Project.
Like Kafka, Pulsar can be used for message processors. A Pulsar instance consists of one or more Pulsar clusters. Each Pulsar cluster consist of 3 components –
- Brokers – It handles and load balances incoming messages between producer and consumer.
- BookKeeper – A BookKeeper cluster in Pulsar handles persistent storage of messages.
- ZooKeeper – A ZooKeeper cluster handles coordination tasks between Pulsar clusters.

Check out the architecture of Pulsar here.
#1. Setup Apache Pulsar on Docker
To setup Pulsar on Docker, use the below command.
$ docker run -it \ -p 6650:6650 \ -p 8080:8080 \ --mount source=pulsardata,target=/pulsar/data \ --mount source=pulsarconf,target=/pulsar/conf \ apachepulsar/pulsar:2.7.0 \ bin/pulsar standalone
This will install Pulsar.
$ docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 3b027303f6e6 apachepulsar/pulsar:2.7.0 "bin/pulsar standalo…" 11 minutes ago Up 11 minutes 0.0.0.0:6650->6650/tcp, 0.0.0.0:8080->8080/tcp gallant_mahavira
Now to test if your Pulsar setup works, you can produce and consume a message in your container. To use bash within a Docker container, use the below command.
docker exec -it <container id> /bin/bash
In Apache Pulsar, when you produce/consume a message to/from a non-existent topic, it automatically gets created.
Produce message –
$:/pulsar/bin # ./pulsar-client produce my-topic --messages "hello-pulsar"
14:38:32.461 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x44ad724c, L:/127.0.0.1:38004 - R:localhost/127.0.0.1:6650]] Connected to server
14:38:32.825 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {
"topicName" : "my-topic",
"producerName" : null,
"sendTimeoutMs" : 30000,
"blockIfQueueFull" : false,
"maxPendingMessages" : 1000,
"maxPendingMessagesAcrossPartitions" : 50000,
"messageRoutingMode" : "RoundRobinPartition",
"hashingScheme" : "JavaStringHash",
"cryptoFailureAction" : "FAIL",
"batchingMaxPublishDelayMicros" : 1000,
"batchingPartitionSwitchFrequencyByPublishDelay" : 10,
"batchingMaxMessages" : 1000,
"batchingMaxBytes" : 131072,
"batchingEnabled" : true,
"chunkingEnabled" : false,
"compressionType" : "NONE",
"initialSequenceId" : null,
"autoUpdatePartitions" : true,
"autoUpdatePartitionsIntervalSeconds" : 60,
"multiSchema" : true,
"properties" : { }
}
14:38:32.848 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: {
"serviceUrl" : "pulsar://localhost:6650/",
"authPluginClassName" : null,
"operationTimeoutMs" : 30000,
"statsIntervalSeconds" : 60,
"numIoThreads" : 1,
"numListenerThreads" : 1,
"connectionsPerBroker" : 1,
"useTcpNoDelay" : true,
"useTls" : false,
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"concurrentLookupRequest" : 5000,
"maxLookupRequest" : 50000,
"maxLookupRedirects" : 20,
"maxNumberOfRejectedRequestPerConnection" : 50,
"keepAliveIntervalSeconds" : 30,
"connectionTimeoutMs" : 10000,
"requestTimeoutMs" : 60000,
"initialBackoffIntervalNanos" : 100000000,
"maxBackoffIntervalNanos" : 60000000000,
"listenerName" : null,
"useKeyStoreTls" : false,
"sslProvider" : null,
"tlsTrustStoreType" : "JKS",
"tlsTrustStorePath" : "",
"tlsTrustStorePassword" : "",
"tlsCiphers" : [ ],
"tlsProtocols" : [ ],
"proxyServiceUrl" : null,
"proxyProtocol" : null,
"enableTransaction" : false
}
14:38:32.883 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [null] Creating producer on cnx [id: 0x44ad724c, L:/127.0.0.1:38004 - R:localhost/127.0.0.1:6650]
14:38:32.933 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [standalone-0-1] Created producer on cnx [id: 0x44ad724c, L:/127.0.0.1:38004 - R:localhost/127.0.0.1:6650]
14:38:33.082 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
14:38:33.135 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/
14:38:33.188 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [standalone-0-1] Closed Producer
14:38:33.205 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x44ad724c, L:/127.0.0.1:38004 ! R:localhost/127.0.0.1:6650] Disconnected
14:38:33.255 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
Consume message –
$:/pulsar/bin # ./pulsar-client consume my-topic -s "first-subscription"
14:35:48.230 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xc02866e9, L:/127.0.0.1:38002 - R:localhost/127.0.0.1:6650]] Connected to server
14:35:48.587 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {
"topicNames" : [ "my-topic" ],
"topicsPattern" : null,
"subscriptionName" : "first-subscription",
"subscriptionType" : "Exclusive",
"subscriptionMode" : "Durable",
"receiverQueueSize" : 1000,
"acknowledgementsGroupTimeMicros" : 100000,
"negativeAckRedeliveryDelayMicros" : 60000000,
"maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
"consumerName" : null,
"ackTimeoutMillis" : 0,
"tickDurationMillis" : 1000,
"priorityLevel" : 0,
"maxPendingChuckedMessage" : 10,
"autoAckOldestChunkedMessageOnQueueFull" : false,
"expireTimeOfIncompleteChunkedMessageMillis" : 60000,
"cryptoFailureAction" : "FAIL",
"properties" : { },
"readCompacted" : false,
"subscriptionInitialPosition" : "Latest",
"patternAutoDiscoveryPeriod" : 60,
"regexSubscriptionMode" : "PersistentOnly",
"deadLetterPolicy" : null,
"retryEnable" : false,
"autoUpdatePartitions" : true,
"autoUpdatePartitionsIntervalSeconds" : 60,
"replicateSubscriptionState" : false,
"resetIncludeHead" : false,
"keySharedPolicy" : null,
"batchIndexAckEnabled" : false
}
14:35:48.604 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
"serviceUrl" : "pulsar://localhost:6650/",
"authPluginClassName" : null,
"operationTimeoutMs" : 30000,
"statsIntervalSeconds" : 60,
"numIoThreads" : 1,
"numListenerThreads" : 1,
"connectionsPerBroker" : 1,
"useTcpNoDelay" : true,
"useTls" : false,
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"concurrentLookupRequest" : 5000,
"maxLookupRequest" : 50000,
"maxLookupRedirects" : 20,
"maxNumberOfRejectedRequestPerConnection" : 50,
"keepAliveIntervalSeconds" : 30,
"connectionTimeoutMs" : 10000,
"requestTimeoutMs" : 60000,
"initialBackoffIntervalNanos" : 100000000,
"maxBackoffIntervalNanos" : 60000000000,
"listenerName" : null,
"useKeyStoreTls" : false,
"sslProvider" : null,
"tlsTrustStoreType" : "JKS",
"tlsTrustStorePath" : "",
"tlsTrustStorePassword" : "",
"tlsCiphers" : [ ],
"tlsProtocols" : [ ],
"proxyServiceUrl" : null,
"proxyProtocol" : null,
"enableTransaction" : false
}
14:35:48.655 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [my-topic][first-subscription] Subscribing to topic on cnx [id: 0xc02866e9, L:/127.0.0.1:38002 - R:localhost/127.0.0.1:6650], consumerId 0
14:35:48.774 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [my-topic][first-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
14:38:33.237 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
----- got message -----
key:[null], properties:[], content:hello-pulsar
14:38:33.322 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/
14:38:33.373 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [my-topic] [first-subscription] Closed consumer
14:38:33.385 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0xc02866e9, L:/127.0.0.1:38002 ! R:localhost/127.0.0.1:6650] Disconnected
14:38:33.420 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully consumed
To probe the stats of a particular topic, you can use the below command. This will provide the no. of messages processed and the subscription details.
root@sethlahaul:~# curl -s http://localhost:8080/admin/v2/persistent/public/default/my-topic/stats | python -m json.tool
{
"averageMsgSize": 0.0,
"backlogSize": 0,
"bytesInCounter": 126,
"bytesOutCounter": 0,
"deduplicationStatus": "Disabled",
"msgChunkPublished": false,
"msgInCounter": 2,
"msgOutCounter": 0,
"msgRateIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputIn": 0.0,
"msgThroughputOut": 0.0,
"publishers": [],
"replication": {},
"storageSize": 126,
"subscriptions": {
"first-subscription": {
"blockedSubscriptionOnUnackedMsgs": false,
"bytesOutCounter": 0,
"chuckedMessageRate": 0,
"consumers": [],
"consumersAfterMarkDeletePosition": {},
"isDurable": true,
"isReplicated": false,
"lastAckedTimestamp": 0,
"lastConsumedFlowTimestamp": 1615559748782,
"lastConsumedTimestamp": 0,
"lastExpireTimestamp": 0,
"msgBacklog": 0,
"msgBacklogNoDelayed": 0,
"msgDelayed": 0,
"msgOutCounter": 0,
"msgRateExpired": 0.0,
"msgRateOut": 0.0,
"msgRateRedeliver": 0.0,
"msgThroughputOut": 0.0,
"type": "Exclusive",
"unackedMessages": 0
}
}
}
So, that’s it for this tutorial. Stay tuned for more.
