Apache Pulsar is a cloud-native, distributed messaging and streaming platform. It was originally created at Yahoo and is now a top-level Apache Software Foundation Project.
Like Kafka, Pulsar can be used for message processors. A Pulsar instance consists of one or more Pulsar clusters. Each Pulsar cluster consist of 3 components –
- Brokers – It handles and load balances incoming messages between producer and consumer.
- BookKeeper – A BookKeeper cluster in Pulsar handles persistent storage of messages.
- ZooKeeper – A ZooKeeper cluster handles coordination tasks between Pulsar clusters.
Check out the architecture of Pulsar here.
#1. Setup Apache Pulsar on Docker
To setup Pulsar on Docker, use the below command.
$ docker run -it \ -p 6650:6650 \ -p 8080:8080 \ --mount source=pulsardata,target=/pulsar/data \ --mount source=pulsarconf,target=/pulsar/conf \ apachepulsar/pulsar:2.7.0 \ bin/pulsar standalone
This will install Pulsar.
$ docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 3b027303f6e6 apachepulsar/pulsar:2.7.0 "bin/pulsar standalo…" 11 minutes ago Up 11 minutes 0.0.0.0:6650->6650/tcp, 0.0.0.0:8080->8080/tcp gallant_mahavira
Now to test if your Pulsar setup works, you can produce and consume a message in your container. To use bash within a Docker container, use the below command.
docker exec -it <container id> /bin/bash
In Apache Pulsar, when you produce/consume a message to/from a non-existent topic, it automatically gets created.
Produce message –
$:/pulsar/bin # ./pulsar-client produce my-topic --messages "hello-pulsar" 14:38:32.461 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x44ad724c, L:/127.0.0.1:38004 - R:localhost/127.0.0.1:6650]] Connected to server 14:38:32.825 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: { "topicName" : "my-topic", "producerName" : null, "sendTimeoutMs" : 30000, "blockIfQueueFull" : false, "maxPendingMessages" : 1000, "maxPendingMessagesAcrossPartitions" : 50000, "messageRoutingMode" : "RoundRobinPartition", "hashingScheme" : "JavaStringHash", "cryptoFailureAction" : "FAIL", "batchingMaxPublishDelayMicros" : 1000, "batchingPartitionSwitchFrequencyByPublishDelay" : 10, "batchingMaxMessages" : 1000, "batchingMaxBytes" : 131072, "batchingEnabled" : true, "chunkingEnabled" : false, "compressionType" : "NONE", "initialSequenceId" : null, "autoUpdatePartitions" : true, "autoUpdatePartitionsIntervalSeconds" : 60, "multiSchema" : true, "properties" : { } } 14:38:32.848 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: { "serviceUrl" : "pulsar://localhost:6650/", "authPluginClassName" : null, "operationTimeoutMs" : 30000, "statsIntervalSeconds" : 60, "numIoThreads" : 1, "numListenerThreads" : 1, "connectionsPerBroker" : 1, "useTcpNoDelay" : true, "useTls" : false, "tlsTrustCertsFilePath" : "", "tlsAllowInsecureConnection" : false, "tlsHostnameVerificationEnable" : false, "concurrentLookupRequest" : 5000, "maxLookupRequest" : 50000, "maxLookupRedirects" : 20, "maxNumberOfRejectedRequestPerConnection" : 50, "keepAliveIntervalSeconds" : 30, "connectionTimeoutMs" : 10000, "requestTimeoutMs" : 60000, "initialBackoffIntervalNanos" : 100000000, "maxBackoffIntervalNanos" : 60000000000, "listenerName" : null, "useKeyStoreTls" : false, "sslProvider" : null, "tlsTrustStoreType" : "JKS", "tlsTrustStorePath" : "", "tlsTrustStorePassword" : "", "tlsCiphers" : [ ], "tlsProtocols" : [ ], "proxyServiceUrl" : null, "proxyProtocol" : null, "enableTransaction" : false } 14:38:32.883 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [null] Creating producer on cnx [id: 0x44ad724c, L:/127.0.0.1:38004 - R:localhost/127.0.0.1:6650] 14:38:32.933 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [standalone-0-1] Created producer on cnx [id: 0x44ad724c, L:/127.0.0.1:38004 - R:localhost/127.0.0.1:6650] 14:38:33.082 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized 14:38:33.135 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/ 14:38:33.188 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [standalone-0-1] Closed Producer 14:38:33.205 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x44ad724c, L:/127.0.0.1:38004 ! R:localhost/127.0.0.1:6650] Disconnected 14:38:33.255 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
Consume message –
$:/pulsar/bin # ./pulsar-client consume my-topic -s "first-subscription" 14:35:48.230 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xc02866e9, L:/127.0.0.1:38002 - R:localhost/127.0.0.1:6650]] Connected to server 14:35:48.587 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: { "topicNames" : [ "my-topic" ], "topicsPattern" : null, "subscriptionName" : "first-subscription", "subscriptionType" : "Exclusive", "subscriptionMode" : "Durable", "receiverQueueSize" : 1000, "acknowledgementsGroupTimeMicros" : 100000, "negativeAckRedeliveryDelayMicros" : 60000000, "maxTotalReceiverQueueSizeAcrossPartitions" : 50000, "consumerName" : null, "ackTimeoutMillis" : 0, "tickDurationMillis" : 1000, "priorityLevel" : 0, "maxPendingChuckedMessage" : 10, "autoAckOldestChunkedMessageOnQueueFull" : false, "expireTimeOfIncompleteChunkedMessageMillis" : 60000, "cryptoFailureAction" : "FAIL", "properties" : { }, "readCompacted" : false, "subscriptionInitialPosition" : "Latest", "patternAutoDiscoveryPeriod" : 60, "regexSubscriptionMode" : "PersistentOnly", "deadLetterPolicy" : null, "retryEnable" : false, "autoUpdatePartitions" : true, "autoUpdatePartitionsIntervalSeconds" : 60, "replicateSubscriptionState" : false, "resetIncludeHead" : false, "keySharedPolicy" : null, "batchIndexAckEnabled" : false } 14:35:48.604 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: { "serviceUrl" : "pulsar://localhost:6650/", "authPluginClassName" : null, "operationTimeoutMs" : 30000, "statsIntervalSeconds" : 60, "numIoThreads" : 1, "numListenerThreads" : 1, "connectionsPerBroker" : 1, "useTcpNoDelay" : true, "useTls" : false, "tlsTrustCertsFilePath" : "", "tlsAllowInsecureConnection" : false, "tlsHostnameVerificationEnable" : false, "concurrentLookupRequest" : 5000, "maxLookupRequest" : 50000, "maxLookupRedirects" : 20, "maxNumberOfRejectedRequestPerConnection" : 50, "keepAliveIntervalSeconds" : 30, "connectionTimeoutMs" : 10000, "requestTimeoutMs" : 60000, "initialBackoffIntervalNanos" : 100000000, "maxBackoffIntervalNanos" : 60000000000, "listenerName" : null, "useKeyStoreTls" : false, "sslProvider" : null, "tlsTrustStoreType" : "JKS", "tlsTrustStorePath" : "", "tlsTrustStorePassword" : "", "tlsCiphers" : [ ], "tlsProtocols" : [ ], "proxyServiceUrl" : null, "proxyProtocol" : null, "enableTransaction" : false } 14:35:48.655 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [my-topic][first-subscription] Subscribing to topic on cnx [id: 0xc02866e9, L:/127.0.0.1:38002 - R:localhost/127.0.0.1:6650], consumerId 0 14:35:48.774 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [my-topic][first-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0 14:38:33.237 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized ----- got message ----- key:[null], properties:[], content:hello-pulsar 14:38:33.322 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/ 14:38:33.373 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [my-topic] [first-subscription] Closed consumer 14:38:33.385 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0xc02866e9, L:/127.0.0.1:38002 ! R:localhost/127.0.0.1:6650] Disconnected 14:38:33.420 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully consumed
To probe the stats of a particular topic, you can use the below command. This will provide the no. of messages processed and the subscription details.
root@sethlahaul:~# curl -s http://localhost:8080/admin/v2/persistent/public/default/my-topic/stats | python -m json.tool { "averageMsgSize": 0.0, "backlogSize": 0, "bytesInCounter": 126, "bytesOutCounter": 0, "deduplicationStatus": "Disabled", "msgChunkPublished": false, "msgInCounter": 2, "msgOutCounter": 0, "msgRateIn": 0.0, "msgRateOut": 0.0, "msgThroughputIn": 0.0, "msgThroughputOut": 0.0, "publishers": [], "replication": {}, "storageSize": 126, "subscriptions": { "first-subscription": { "blockedSubscriptionOnUnackedMsgs": false, "bytesOutCounter": 0, "chuckedMessageRate": 0, "consumers": [], "consumersAfterMarkDeletePosition": {}, "isDurable": true, "isReplicated": false, "lastAckedTimestamp": 0, "lastConsumedFlowTimestamp": 1615559748782, "lastConsumedTimestamp": 0, "lastExpireTimestamp": 0, "msgBacklog": 0, "msgBacklogNoDelayed": 0, "msgDelayed": 0, "msgOutCounter": 0, "msgRateExpired": 0.0, "msgRateOut": 0.0, "msgRateRedeliver": 0.0, "msgThroughputOut": 0.0, "type": "Exclusive", "unackedMessages": 0 } } }
So, that’s it for this tutorial. Stay tuned for more.