Pulsar concepts and architecture

A high-level overview of Pulsar's moving pieces

Pulsar is a multi-tenant, high-performance solution for server-to-server messaging originally developed by Yahoo and now under the stewardship of the Apache Software Foundation.

Pulsar’s key features include:

Producers, consumers, topics, and subscriptions

Pulsar is built on the publish-subscribe pattern, aka pub-sub. In this pattern, producers publish messages to topics. Consumers can then subscribe to those topics, process incoming messages, and send an acknowledgement when processing is complete.

Once a subscription has been created, all messages will be retained by Pulsar, even if the consumer gets disconnected. Retained messages will be discarded only when a consumer acknowledges that they’ve been successfully processed.

Producers

A producer is a process that attaches to a topic and publishes messages to a Pulsar broker for processing.

Send modes

Producers can send messages to brokers either synchronously (sync) or asynchronously (async).

Mode Description
Sync send The producer will wait for acknowledgement from the broker after sending each message. If acknowledgment isn’t received then the producer will consider the send operation a failure.
Async send The producer will put the message in a blocking queue and return immediately. The client library will then send the message to the broker in the background. If the queue is full (max size configurable, the producer could be blocked or fail immediately when calling the API, depending on arguments passed to the producer.

Compression

Messages published by producers can be compressed during transportation in order to save bandwidth. Pulsar currently supports two types of compression:

Batching

If batching is enabled, the producer will accumulate and send a batch of messages in a single request. Batching size is defined by the maximum number of messages and maximum publish latency.

Consumers

A consumer is a process that attaches to a topic via a subscription and then receives messages.

Receive modes

Messages can be received from brokers either synchronously (sync) or asynchronously (async).

Mode Description
Sync receive A sync receive will be blocked until a message is available.
Async receive An async receive will return immediately with a future value—a CompletableFuture in Java, for example—that completes once a new message is available.

Acknowledgement

When a consumer has successfully processed a message, it needs to send an acknowledgement to the broker so that the broker can discard the message (otherwise it stores the message).

Messages can be acknowledged either one by one or cumulatively. With cumulative acknowledgement, the consumer only needs to acknowledge the last message it received. All messages in the stream up to (and including) the provided message will not be re-delivered to that consumer.

Cumulative acknowledgement cannot be used with shared subscription mode, because shared mode involves multiple consumers having access to the same subscription.

Listeners

Client libraries can provide their own listener implementations for consumers. The Java client, for example, provides a MesssageListener interface. In this interface, the received method is called whenever a new message is received.

Topics

As in other pub-sub systems, topics in Pulsar are named channels for transmitting messages from producers to consumers. Topic names are URLs that have a well-defined structure:

persistent://property/cluster/namespace/topic
Topic name component Description
persistent It identifies type of topic. Pulsar supports two kind of topics: persistent and non-persistent. In persistent topic, all messages are durably persisted on disk (that means on multiple disks unless the broker is standalone), whereas non-persistent topic does not persist message into storage disk.
property The topic’s tenant within the instance. Tenants are essential to multi-tenancy in Pulsar and can be spread across clusters.
cluster Where the topic is located. Typically there will be one cluster for each geographical region or data center.
namespace The administrative unit of the topic, which acts as a grouping mechanism for related topics. Most topic configuration is performed at the namespace level. Each property (tenant) can have multiple namespaces.
topic The final part of the name. Topic names are freeform and have no special meaning in a Pulsar instance.

No need to explicitly create new topics

Application does not explicitly create the topic but attempting to write or receive message on a topic that does not yet exist, Pulsar will automatically create that topic under the namespace.

Namespace

A namespace is a logical nomenclature within a property. A property can create multiple namespaces via admin API. For instance, a property with different applications can create a separate namespace for each application. A namespace allows the application to create and manage a hierarchy of topics. For e.g. my-property/my-cluster/my-property-app1 is a namespace for the application my-property-app1 in cluster my-cluster for my-property. Application can create any number of topics under the namespace.

Subscription modes

A subscription is a named configuration rule that determines how messages are delivered to consumers. There are three available subscription modes in Pulsar: exclusive, shared, and failover. These modes are illustrated in the figure below.

Subscription Modes

Exclusive

In exclusive mode, only a single consumer is allowed to attach to the subscription. If more than one consumer attempts to subscribe to a topic using the same subscription, the consumer receives an error.

In the diagram above, only Consumer-A is allowed to consume messages.

Exclusive mode is the default subscription mode.

Shared

In shared or round robin mode, multiple consumers can attach to the same subscription. Messages are delivered in a round robin distribution across consumers, and any given message is delivered to only one consumer. When a consumer disconnects, all the messages that were sent to it and not acknowledged will be rescheduled for sending to the remaining consumers.

In the diagram above, Consumer-B-1 and Consumer-B-2 are able to subscribe to the topic, but Consumer-C-1 and others could as well.

Limitations of shared mode

There are two important things to be aware of when using shared mode:

  1. Message ordering is not guaranteed.
  2. You cannot use cumulative acknowledgment with shared mode.

Failover

In failover mode, multiple consumers can attach to the same subscription. The consumers will be lexically sorted by the consumer’s name and the first consumer will initially be the only one receiving messages. This consumer is called the master consumer.

When the master consumer disconnects, all (non-acked and subsequent) messages will be delivered to the next consumer in line.

In the diagram above, Consumer-C-1 is the master consumer while Consumer-C-2 would be the next in line to receive messages if Consumer-C-2 disconnected.

Partitioned topics

Normal topics can be served only by a single broker, which limits the topic’s maximum throughput. Partitioned topics are a special type of topic that be handled by multiple brokers, which allows for much higher throughput.

Behind the scenes, a partitioned topic is actually implemented as N internal topics, where N is the number of partitions. When publishing messages to a partitioned topic, each message is routed to one of several brokers. The distribution of partitions across brokers is handled automatically by Pulsar.

The diagram below illustrates this:

Partitioned Topic

Here, the topic T1 has five partitions (P0 through P4) split across three brokers. Because there are more partitions than brokers, two brokers handle two partitions a piece, while the third handles only one (again, Pulsar handles this distribution of partitions automatically).

Messages for this topic are broadcast to two consumers. The routing mode determines both which broker handles each partition, while the subscription mode determines which messages go to which consumers.

Decisions about routing and subscription modes can be made separately in most cases. Throughput concerns should guide partitioning/routing decisions while subscription decisions should be guided by application semantics.

There is no difference between partitioned topics and normal topics in terms of how subscription modes work, as partitioning only determines what happens between when a message is published by a producer and processed and acknowledged by a consumer.

Partitioned topics need to be explicitly created via the admin API. The number of partitions can be specified when creating the topic.

Routing modes

When publishing to partitioned topics, you must specify a routing mode. The routing mode determines which partition—that is, which internal topic—each message should be published to.

There are three routing modes available by default:

Mode Description Ordering guarantee
Key hash If a key property has been specified on the message, the partitioned producer will hash the key and assign it to a particular partition. Per-key-bucket ordering
Single default partition If no key is provided, each producer’s message will be routed to a dedicated partition, initially random selected Per-producer ordering
Round robin distribution If no key is provided, all messages will be routed to different partitions in round-robin fashion to achieve maximum throughput. None

In addition to these default modes, you can also create a custom routing mode if you’re using the Java client by implementing the MessageRouter interface.

Non-persistent topics

Notice

This feature is still in experimental mode and implementation details may change in future release.

As name suggests, non-persist topic does not persist messages into any durable storage disk unlike persistent topic where messages are durably persisted on multiple disks.

Therefore, if you are using persistent delivery, messages are persisted to disk/database so that they will survive a broker restart or subscriber failover. While using non-persistent delivery, if you kill a broker or subscriber is disconnected then subscriber will lose all in-transit messages. So, client may see message loss with non-persistent topic.

  • In non-persistent topic, as soon as broker receives published message, it immediately delivers this message to all connected subscribers without persisting them into any storage. So, if subscriber gets disconnected with broker then broker will not be able to deliver those in-transit messages and subscribers will never be able to receive those messages again. Broker also drops a message for the consumer, if consumer does not have enough permit to consume message, or consumer TCP channel is not writable. Therefore, consumer receiver queue size (to accommodate enough permits) and TCP-receiver window size (to keep channel writable) should be configured properly to avoid message drop for that consumer.
  • Broker only allows configured number of in-flight messages per client connection. So, if producer tries to publish messages higher than this rate, then broker silently drops those new incoming messages without processing and delivering them to the subscribers. However, broker acknowledges with special message-id (msg-id: -1:-1) for those dropped messages to signal producer about the message drop.

Performance

Non-persistent messaging is usually faster than persistent messaging because broker does not persist messages and immediately sends ack back to producer as soon as that message deliver to all connected subscribers. Therefore, producer sees comparatively low publish latency with non-persistent topic.

Client API

A topic name will look like:

non-persistent://my-property/us-west/my-namespace/my-topic

Producer and consumer can connect to non-persistent topic in a similar way, as persistent topic except topic name must start with non-persistent.

Non-persistent topic supports all 3 different subscription-modes: Exclusive, Shared, Failover which are already explained in details at GettingStarted.

Consumer API
PulsarClient client = PulsarClient.create("pulsar://localhost:6650");

Consumer consumer = client.subscribe(
            "non-persistent://sample/standalone/ns1/my-topic",
            "my-subscribtion-name");
Producer API
PulsarClient client = PulsarClient.create("pulsar://localhost:6650");

Producer producer = client.createProducer(
            "non-persistent://sample/standalone/ns1/my-topic");

Broker configuration

Sometimes, there would be a need to configure few dedicated brokers in a cluster, to just serve non-persistent topics.

Broker configuration for enabling broker to own only configured type of topics

# It disables broker to load persistent topics
enablePersistentTopics=false
# It enables broker to load non-persistent topics
enableNonPersistentTopics=true

Architecture overview

At the highest level, a Pulsar instance is composed of one or more Pulsar clusters. Clusters within an instance can replicate data amongst themselves.

In a Pulsar cluster:

  • One or more brokers handles and load balances incoming messages from producers, dispatches messages to consumers, communicates with global ZooKeeper to handle various coordination tasks, stores messages in BookKeeper instances (aka bookies), relies on a cluster-specific ZooKeeper cluster for certain tasks, and more.
  • A BookKeeper cluster consisting of one more or more bookies handles persistent storage of messages.
  • A ZooKeeper cluster specific to that cluster handles

The diagram below provides an illustration of a Pulsar cluster:

Architecture Diagram

At the broader instance level, an instance-wide ZooKeeper cluster called global ZooKeeper handles coordination tasks involving multiple clusters, for example geo-replication.

Brokers

The Pulsar message broker is a stateless component that’s primarily responsible for running two other components:

Messages are typically dispatched out of a managed ledger cache for the sake of performance, unless the backlog exceeds the cache size. If the backlog grows too large for the cache, the broker will start reading entries from BookKeeper.

Finally, to support geo-replication on global topics, the broker manages replicators that tail the entries published in the local region and republish them to the remote region using the Pulsar Java client library.

For a guide to managing Pulsar brokers, see the Clusters and brokers guide.

Clusters

A Pulsar instance consists of one or more Pulsar clusters. Clusters, in turn, consist of:

  • One or more Pulsar brokers
  • A ZooKeeper quorum used for cluster-level configuration and coordination
  • An ensemble of bookies used for persistent storage of messages

Clusters can replicate amongst themselves using geo-replication.

For a guide to managing Pulsar clusters, see the Clusters and brokers guide.

Global cluster

In any Pulsar instance, there is an instance-wide cluster called global that you can use to mange non-cluster-specific namespaces and topics. The global cluster is created for you automatically when you initialize metadata for the first cluster in your instance.

Global topic names have this basic structure (note the global cluster):

persistent://my-property/global/my-namespace/my-topic

Metadata store

Pulsar uses Apache Zookeeper for metadata storage, cluster configuration, and coordination. In a Pulsar instance:

  • A global ZooKeeper quorum stores configuration for properties, namespaces, and other entities that need to be globally consistent.
  • Each cluster has its own local ZooKeeper ensemble that stores cluster-specific configuration and coordination such as ownership metadata, broker load reports, BookKeeper ledger metadata, and more.

When creating a new cluster

Persistent storage

Brokers and bookies

Pulsar provides guaranteed message delivery for applications. If a message successfully reaches a Pulsar broker, it will be delivered to its intended target.

This guarantee requires that non-acknowledged messages are stored in a durable manner until they can be delivered to and acknowledged by consumers. This mode of messaging is commonly called persistent messaging. In Pulsar, N copies of all messages are stored and synced on disk, for example 4 copies across two servers with mirrored RAID volumes on each server.

Pulsar uses a system called Apache BookKeeper for persistent message storage. BookKeeper is a distributed write-ahead log (WAL) system that provides a number of crucial advantages for Pulsar:

  • It enables Pulsar to utilize many independent logs, called ledgers. Multiple ledgers can be created for topics over time.
  • It offers very efficient storage for sequential data that handles entry replication.
  • It guarantees read consistency of ledgers in the presence of various system failures.
  • It offers even distribution of I/O across bookies.
  • It’s horizontally scalable in both capacity and throughput. Capacity can be immediately increased by adding more bookies to a cluster.
  • Bookies are designed to handle thousands of ledgers with concurrent reads and writes. By using multiple disk devices—one for journal and another for general storage–bookies are able to isolate the effects of read operations from the latency of ongoing write operations.

In addition to message data, cursors are also persistently stored in BookKeeper. Cursors are subscription positions for consumers. BookKeeper enables Pulsar to store consumer position in a scalable fashion.

At the moment, Pulsar only supports persistent message storage. This accounts for the persistent in all topic names. Here’s an example:

persistent://my-property/global/my-namespace/my-topic

In the future, Pulsar will support ephemeral message storage.

Ledgers

A ledger is an append-only data structure with a single writer that is assigned to multiple BookKeeper storage nodes, or bookies. Ledger entries are replicated to multiple bookies. Ledgers themselves have very simple semantics:

  • A Pulsar broker can create a ledger, append entries to the ledger, and close the ledger.
  • After the ledger has been closed—either explicitly or because the writer process crashed—it can then be opened only in read-only mode.
  • Finally, when entries in the ledger are no longer needed, the whole ledger can be deleted from the system (across all bookies).

Ledger read consistency

The main strength of Bookkeeper is that it guarantees read consistency in ledgers in the presence of failures. Since the ledger can only be written to by a single process, that process is free to append entries very efficiently, without need to obtain consensus. After a failure, the ledger will go through a recovery process that will finalize the state of the ledger and establish which entry was last committed to the log. After that point, all readers of the ledger are guaranteed to see the exact same content.

Managed ledgers

Given that Bookkeeper ledgers provide a single log abstraction, a library was developed on top of the ledger called the managed ledger that represents the storage layer for a single topic. A managed ledger represents the abstraction of a stream of messages with a single writer that keeps appending at the end of the stream and multiple cursors that are consuming the stream, each with its own associated position.

Internally, a single managed ledger uses multiple BookKeeper ledgers to store the data. There are two reasons to have multiple ledgers:

  1. After a failure, a ledger is no longer writable and a new one needs to be created.
  2. A ledger can be deleted when all cursors have consumed the messages it contains. This allows for periodic rollover of ledgers.

Journal storage

In BookKeeper, journal files contain BookKeeper transaction logs. Before making an update to a ledger, a bookie needs to ensure that a transaction describing the update is written to persistent (non-volatile) storage. A new journal file is created once the bookie starts or the older journal file reaches the journal file size threshold (configured using the journalMaxSizeMB parameter).

Non-persistent storage

A future version of BookKeeper will support non-persistent messaging and thus multiple durability modes at the topic level. This will enable you to set the durability mode at the topic level, replacing the persistent in topic names with a non-persistent indicator.

Message retention and expiry

By default, Pulsar message brokers:

  • immediately delete all messages that have been acknowledged by a consumer, and
  • persistently store all unacknowledged messages in a message backlog.

Pulsar has two features, however, that enable you to override this default behavior:

  • Message retention enables you to store messages that have been acknowledged by a consumer
  • Message expiry enables you to set a time to live (TTL) for messages that have not yet been acknowledged

All message retention and expiry is managed at the namespace level. For a how-to, see the Message retention and expiry admin documentation.

The diagram below illustrates both concepts:

With message retention, shown at the top, a retention policy applied to all topics in a namespace dicates that some messages are durably stored in Pulsar even though they’ve already been acknowledged. Acknowledged messages that are not covered by the retention policy are deleted. Without a retention policy, all of the acknowledged messages would be deleted.

With message expiry, shown at the bottom, some messages are deleted, even though they haven’t been acknowledged, because they’ve expired according to the TTL applied to the namespace (for example because a TTL of 5 minutes has been applied and the messages haven’t been acknowledged but are 10 minutes old).

Replication

Pulsar enables messages to be produced and consumed in different geo-locations. For instance, your application may be publishing data in one region or market and you would like to process it for consumption in other regions or markets. Geo-replication in Pulsar enables you to do that.

Multi-tenancy

Pulsar was created from the ground up as a multi-tenant system. To support multi-tenancy, Pulsar has a concept of properties. Properties can be spread across clusters and can each have their own authentication and authorization scheme applied to them. They are also the administrative unit at which storage quotas, message TTL, and isolation policies can be managed.

The multi-tenant nature of Pulsar is reflected mostly visibly in topic URLs, which have this structure:

persistent://property/cluster/namespace/topic

As you can see, the property is the most basic unit of categorization for topics (and even more fundamental than the cluster).

Properties and namespaces

Pulsar was designed from the ground up to be a multi-tenant system. In Pulsar, tenants are identified by properties. Properties are the highest administrative unit within a Pulsar instance. Within properties

Properties

To each property in a Pulsar instance you can assign:

  • An authorization scheme
  • The set of clusters to which the property applies

Namespaces

Properties and namespaces are two key concepts of Pulsar to support multi-tenancy.

  • A property identifies a tenant. Pulsar is provisioned for a specified property with appropriate capacity allocated to the property.
  • A namespace is the administrative unit nomenclature within a property. The configuration policies set on a namespace apply to all the topics created in such namespace. A property may create multiple namespaces via self-administration using REST API and CLI tools. For instance, a property with different applications can create a separate namespace for each application.

Names for topics in the same namespace will look like this:

persistent://my-property/us-w/my-app1/my-topic-1
persistent://my-property/us-w/my-app1/my-topic-2
persistent://my-property/us-w/my-app1/my-topic-3

Authentication and Authorization

Pulsar supports a pluggable authentication mechanism which can be configured at broker and it also supports authorization to identify client and its access rights on topics and properties.

Client interface

Pulsar exposes a client API with language bindings for Java and C++. The client API optimizes and encapsulates Pulsar’s client-broker communication protocol and exposes a simple and intuitive API for use by applications.

Under the hood, the current official Pulsar client libraries support transparent reconnection and/or connection failover to brokers, queuing of messages until acknowledged by the broker, and heuristics such as connection retries with backoff.

Custom client libraries

If you’d like to create your own client library, we recommend consulting the documentation on Pulsar’s custom binary protocol.

Client setup phase

When an application wants to create a producer/consumer, the Pulsar client library will initiate a setup phase that is composed of two steps:

  1. The client will attempt to determine the owner of the topic by sending an HTTP lookup request to the broker. The request could reach one of the active brokers which, by looking at the (cached) zookeeper metadata will know who is serving the topic or, in case nobody is serving it, will try to assign it to the least loaded broker.
  2. Once the client library has the broker address, it will create a TCP connection (or reuse an existing connection from the pool) and authenticate it. Within this connection, client and broker exchange binary commands from a custom protocol. At this point the client will send a command to create producer/consumer to the broker, which will comply after having validated the authorization policy.

Whenever the TCP connection breaks, the client will immediately re-initiate this setup phase and will keep trying with exponential backoff to re-establish the producer or consumer until the operation succeeds.

Pulsar proxy

One way for Pulsar clients to interact with a Pulsar cluster is by connecting to Pulsar message brokers directly. In some cases, however, this kind of direct connection is either infeasible or undesirable because the client doesn’t have direct access to broker addresses. If you’re running Pulsar in a cloud environment or on Kubernetes or an analogous platform, for example, then direct client connections to brokers are likely not possible.

The Pulsar proxy provides a solution to this problem by acting as a single gateway for all of the brokers in a cluster. If you run the Pulsar proxy (which, again, is optional), all client connections with the Pulsar cluster will flow through the proxy rather than communicating with brokers.

For the sake of performance and fault tolerance, you can run as many instances of the Pulsar proxy as you’d like.

Architecturally, the Pulsar proxy gets all the information it requires from ZooKeeper. When starting the proxy on a machine, you only need to provide ZooKeeper connection strings for the cluster-specific and global ZooKeeper clusters. Here’s an example:

$ bin/pulsar proxy \
  --zookeeper-servers zk-0,zk-1,zk-2 \
  --global-zookeeper-servers zk-0,zk-1,zk-2

Pulsar proxy docs

For documentation on using the Pulsar proxy, see the Pulsar proxy admin documentation.

Some important things to know about the Pulsar proxy:

  • Connecting clients don’t need to provide any specific configuration to use the Pulsar proxy. You won’t need to update the client configuration for existing applications beyond updating the IP used for the service URL (for example if you’re running a load balancer over the Pulsar proxy).
  • TLS encryption and authentication is supported by the Pulsar proxy

Service discovery

Clients connecting to Pulsar brokers need to be able to communicate with an entire Pulsar instance using a single URL. Pulsar provides a built-in service discovery mechanism that you can set up using the instructions in the Deploying a Pulsar instance guide.

You can use your own service discovery system if you’d like. If you use your own system, there is just one requirement: when a client performs an HTTP request to an endpoint, such as http://pulsar.us-west.example.com:8080, the client needs to be redirected to some active broker in the desired cluster, whether via DNS, an HTTP or IP redirect, or some other means.

Reader interface

In Pulsar, the “standard” consumer interface involves using consumers to listen on topics, process incoming messages, and finally acknowledge those messages when they’ve been processed. Whenever a consumer connects to a topic, it automatically begins reading from the earliest un-acked message onward because the topic’s cursor is automatically managed by Pulsar.

The reader interface for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic—rather than a consumer—you need to specify which message the reader begins reading from when it connects to a topic. When connecting to a topic, the reader interface enables you to begin with:

  • The earliest available message in the topic
  • The latest available message in the topic
  • Some other message between the earliest and the latest. If you select this option, you’ll need to explicitly provide a message ID. Your application will be responsible for “knowing” this message ID in advance, perhaps fetching it from a persistent data store or cache.

The reader interface is helpful for use cases like using Pulsar to provide effectively-once processing semantics for a stream processing system. For this use case, it’s essential that the stream processing system be able to “rewind” topics to a specific message and begin reading there. The reader interface provides Pulsar clients with the low-level abstraction necessary to “manually position” themselves within a topic.

The Pulsar consumer and reader interfaces

Non-partitioned topics only

The reader interface for Pulsar cannot currently be used with partitioned topics.

Here’s a Java example that begins reading from the earliest available message on a topic:

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;

String topic = "persistent://sample/standalone/ns1/reader-api-test";
MessageId id = MessageId.earliest;

// Create a reader on a topic and for a specific message (and onward)
Reader reader = pulsarClient.createReader(topic, id, new ReaderConfiguration());

while (true) {
    Message message = reader.readNext();

    // Process the message
}

To create a reader that will read from the latest available message:

MessageId id = MessageId.latest;
Reader reader = pulsarClient.createReader(topic, id, new ReaderConfiguration());

To create a reader that will read from some message between earliest and latest:

byte[] msgIdBytes = // Some byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.createReader(topic, id, new ReaderConfiguration());