Partitioned topics
Expand message throughput by distributing load within topicsPulsar version 2.0
The documentation that you’re reading is for the 2.0 release of Apache Pulsar. For more information on Pulsar 2.0, see this guide.
By default, Pulsar topics are served by a single broker. Using only a single broker, however, limits a topic’s maximum throughput. Partitioned topics are a special type of topic that can span multiple brokers and thus allow for much higher throughput. For an explanation of how partitioned topics work, see the Concepts section below.
You can publish to partitioned topics using Pulsar’s client libraries and you can create and manage partitioned topics using Pulsar’s admin API.
Publishing to partitioned topics
When publishing to partitioned topics, the only difference from non-partitioned topics is that you need to specify a routing mode when you create a new producer. Examples for Java are below.
Java
Publishing messages to partitioned topics in the Java client works much like publishing to normal topics. The difference is that you need to specify either one of the currently available message routers or a custom router.
Routing mode
You can specify the routing mode in the ProducerConfiguration
object that you use to configure your producer. You have three options:
SinglePartition
RoundRobinPartition
CustomPartition
Here’s an example:
String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-property/my-cluster-my-namespace/my-topic";
PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
ProducerConfiguration config = new ProducerConfiguration();
config.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.SinglePartition);
Producer producer = client.createProducer(topic, config);
producer.send("Partitioned topic message".getBytes());
Custom message router
To use a custom message router, you need to provide an implementation of the MessageRouter
interface, which has just one choosePartition
method:
public interface MessageRouter extends Serializable {
int choosePartition(Message msg);
}
Here’s a (not very useful!) router that routes every message to partition 10:
public class AlwaysTenRouter implements MessageRouter {
public int choosePartition(Message msg) {
return 10;
}
}
With that implementation in hand, you can send
String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-property/my-cluster-my-namespace/my-topic";
PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
ProducerConfiguration config = new ProducerConfiguration();
config.setMessageRouter(AlwaysTenRouter);
Producer producer = client.createProducer(topic, config);
producer.send("Partitioned topic message".getBytes());
Pulsar admin setup
Each of Pulsar’s three admin interfaces—the pulsar-admin
CLI tool, the Java admin API, and the REST API—requires some special setup if you have authentication enabled in your Pulsar instance.
pulsar-admin
If you have authentication enabled, you will need to provide an auth configuration to use the pulsar-admin
tool. By default, the configuration for the pulsar-admin
tool is found in the conf/client.conf
file. Here are the available parameters:
Name | Description | Default |
---|---|---|
webServiceUrl | The web URL for the cluster. | http://localhost:8080/ |
brokerServiceUrl | The Pulsar protocol URL for the cluster. | pulsar://localhost:6650/ |
authPlugin | The authentication plugin. | |
authParams | The authentication parameters for the cluster, as a comma-separated string. | |
useTls | Whether or not TLS authentication will be enforced in the cluster. | false |
tlsAllowInsecureConnection | ||
tlsTrustCertsFilePath |
REST API
You can find documentation for the REST API exposed by Pulsar brokers in this reference document.
Java admin client
To use the Java admin API, instantiate a PulsarAdmin
object, specifying a URL for a Pulsar broker and a ClientConfiguration
. Here’s a minimal example using localhost
:
URL url = new URL("http://localhost:8080");
String authPluginClassName = "com.org.MyAuthPluginClass"; //Pass auth-plugin class fully-qualified name if Pulsar-security enabled
String authParams = "param1=value1";//Pass auth-param if auth-plugin class requires it
boolean useTls = false;
boolean tlsAllowInsecureConnection = false;
String tlsTrustCertsFilePath = null;
ClientConfiguration config = new ClientConfiguration();
config.setAuthentication(authPluginClassName, authParams);
config.setUseTls(useTls);
config.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
config.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
PulsarAdmin admin = new PulsarAdmin(url, config);
Managing partitioned topics
You can use Pulsar’s admin API to create and manage partitioned topics.
In all of the instructions and commands below, the topic name structure is:
Create
Partitioned topics in Pulsar must be explicitly created. When creating a new partitioned topic you need to provide a name for the topic as well as the desired number of partitions.
Global partitioned topics
If you’d like to create a global partitioned topic, you need to create a partitioned topic using the instructions here and specify global
as the cluster in the topic name.
pulsar-admin
You can create partitioned topics using the create-partitioned-topic
command and specifying the topic name as an argument and the number of partitions using the -p
or --partitions
flag. Here’s an example:
$ bin/pulsar-admin persistent create-partitioned-topic \
persistent://my-property/my-cluster-my-namespace/my-topic \
--partitions 4
REST API
PUT
/admin/persistent/:property/:cluster/:namespace/:destination/partitions
Java
String topicName = "persistent://my-property/my-cluster-my-namespace/my-topic";
int numPartitions = 4;
admin.persistentTopics().createPartitionedTopic(topicName, numPartitions);
Get metadata
Partitioned topics have metadata associated with them that you can fetch as a JSON object. The following metadata fields are currently available:
Field | Meaning |
---|---|
partitions |
The number of partitions into which the topic is divided |
pulsar-admin
You can see the number of partitions in a partitioned topic using the get-partitioned-topic-metadata
subcommand. Here’s an example:
$ pulsar-admin persistent get-partitioned-topic-metadata \
persistent://my-property/my-cluster-my-namespace/my-topic
{
"partitions": 4
}
REST API
GET
/admin/persistent/:property/:cluster:/:namespace/:destination/partitions
Java
String topicName = "persistent://my-property/my-cluster-my-namespace/my-topic";
admin.persistentTopics().getPartitionedTopicMetadata(topicName);
Update
You can update the number of partitions on an existing partitioned topic if the topic is non-global. To update, the new number of partitions must be greater than the existing number.
Decrementing the number of partitions would deleting the topic, which is not supported in Pulsar.
Already created partitioned producers and consumers can’t see newly created partitions and it requires to recreate them at application so, newly created producers and consumers can connect to newly added partitions as well. Therefore, it can violate partition ordering at producers until all producers are restarted at application.
pulsar-admin
Partitioned topics can be updated using the update-partitioned-topic
command.
$ pulsar-admin persistent update-partitioned-topic \
persistent://my-property/my-cluster-my-namespace/my-topic \
--partitions 8
REST API
POST
/admin/persistent/:property/:cluster/:namespace/:destination/partitions
Java
admin.persistentTopics().updatePartitionedTopic(persistentTopic, numPartitions);
Delete
pulsar-admin
Partitioned topics can be deleted using the delete-partitioned-topic
command, specifying the topic by name:
$ bin/pulsar-admin persistent delete-partitioned-topic \
persistent://my-property/my-cluster-my-namespace/my-topic
REST API
DELETE
/admin/persistent/:property/:cluster/:namespace/:destination/partitions
Java
admin.persistentTopics().delete(persistentTopic);
List
It provides a list of persistent topics existing under a given namespace.
pulsar-admin
$ pulsar-admin persistent list prop-1/cluster-1/namespace
persistent://property/cluster/namespace/topic
persistent://property/cluster/namespace/topic
REST API
GET
/admin/persistent/:property/:cluster/:namespace
Java
admin.persistentTopics().getList(namespace);
Stats
It shows current statistics of a given partitioned topic. Here’s an example payload:
{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers": []
}
},
"replication": {}
}
The following stats are available:
Stat | Description |
---|---|
msgRateIn | The sum of all local and replication publishers’ publish rates in messages per second |
msgThroughputIn | Same as msgRateIn but in bytes per second instead of messages per second
|
msgRateOut | The sum of all local and replication consumers’ dispatch rates in messages per second |
msgThroughputOut | Same as msgRateOut but in bytes per second instead of messages per second
|
averageMsgSize | Average message size, in bytes, from this publisher within the last interval |
storageSize | The sum of the ledgers’ storage size for this topic |
publishers | The list of all local publishers into the topic. There can be anywhere from zero to thousands. |
producerId | Internal identifier for this producer on this topic |
producerName | Internal identifier for this producer, generated by the client library |
address | IP address and source port for the connection of this producer |
connectedSince | Timestamp this producer was created or last reconnected |
subscriptions | The list of all local subscriptions to the topic |
my-subscription | The name of this subscription (client defined) |
msgBacklog | The count of messages in backlog for this subscription |
type | This subscription type |
msgRateExpired | The rate at which messages were discarded instead of dispatched from this subscription due to TTL |
consumers | The list of connected consumers for this subscription |
consumerName | Internal identifier for this consumer, generated by the client library |
availablePermits | The number of messages this consumer has space for in the client library’s listen queue. A value of 0 means the client library’s queue is full and receive() isn’t being called. A nonzero value means this consumer is ready to be dispatched messages. |
replication | This section gives the stats for cross-colo replication of this topic |
replicationBacklog | The outbound replication backlog in messages |
connected | Whether the outbound replicator is connected |
replicationDelayInSeconds | How long the oldest message has been waiting to be sent through the connection, if connected is true |
inboundConnection | The IP and port of the broker in the remote cluster’s publisher connection to this broker |
inboundConnectedSince | The TCP connection being used to publish messages to the remote cluster. If there are no local publishers connected, this connection is automatically closed after a minute. |
pulsar-admin
$ pulsar-admin persistent partitioned-stats \
persistent://test-property/cl1/ns1/tp1 \
--per-partition
REST API
GET
/admin/persistent/:property/:cluster/:namespace/:destination/partitioned-stats
Java
admin.persistentTopics().getStats(persistentTopic);
Internal stats
It shows detailed statistics of a topic.
Stat | Description |
---|---|
entriesAddedCounter | Messages published since this broker loaded this topic |
numberOfEntries | Total number of messages being tracked |
totalSize | Total storage size in bytes of all messages |
currentLedgerEntries | Count of messages written to the ledger currently open for writing |
currentLedgerSize | Size in bytes of messages written to ledger currently open for writing |
lastLedgerCreatedTimestamp | Time when last ledger was created |
lastLedgerCreationFailureTimestamp | time when last ledger was failed |
waitingCursorsCount | How many cursors are caught up and waiting for a new message to be published |
pendingAddEntriesCount | How many messages have (asynchronous) write requests we are waiting on completion |
lastConfirmedEntry | The ledgerid:entryid of the last message successfully written. If the entryid is -1, then the ledger has been opened or is currently being opened but has no entries written yet. |
state | The state of the cursor ledger. Open means we have a cursor ledger for saving updates of the markDeletePosition. |
ledgers | The ordered list of all ledgers for this topic holding its messages |
cursors | The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats. |
markDeletePosition | The ack position: the last message the subscriber acknowledged receiving |
readPosition | The latest position of subscriber for reading message |
waitingReadOp | This is true when the subscription has read the latest message published to the topic and is waiting on new messages to be published. |
pendingReadOps | The counter for how many outstanding read requests to the BookKeepers we have in progress |
messagesConsumedCounter | Number of messages this cursor has acked since this broker loaded this topic |
cursorLedger | The ledger being used to persistently store the current markDeletePosition |
cursorLedgerLastEntry | The last entryid used to persistently store the current markDeletePosition |
individuallyDeletedMessages | If Acks are being done out of order, shows the ranges of messages Acked between the markDeletePosition and the read-position |
lastLedgerSwitchTimestamp | The last time the cursor ledger was rolled over |
{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}
pulsar-admin
$ pulsar-admin persistent stats-internal \
persistent://test-property/cl1/ns1/tp1
REST API
GET
/admin/persistent/:property/:cluster/:namespace/:destination/internalStats
Java
admin.persistentTopics().getInternalStats(persistentTopic);
Concepts
Normal topics can be served only by a single broker, which limits the topic’s maximum throughput. Partitioned topics are a special type of topic that be handled by multiple brokers, which allows for much higher throughput.
Behind the scenes, a partitioned topic is actually implemented as N internal topics, where N is the number of partitions. When publishing messages to a partitioned topic, each message is routed to one of several brokers. The distribution of partitions across brokers is handled automatically by Pulsar.
The diagram below illustrates this:
Here, the topic Topic1 has five partitions (P0 through P4) split across three brokers. Because there are more partitions than brokers, two brokers handle two partitions a piece, while the third handles only one (again, Pulsar handles this distribution of partitions automatically).
Messages for this topic are broadcast to two consumers. The routing mode determines both which broker handles each partition, while the subscription mode determines which messages go to which consumers.
Decisions about routing and subscription modes can be made separately in most cases. In general, throughput concerns should guide partitioning/routing decisions while subscription decisions should be guided by application semantics.
There is no difference between partitioned topics and normal topics in terms of how subscription modes work, as partitioning only determines what happens between when a message is published by a producer and processed and acknowledged by a consumer.
Partitioned topics need to be explicitly created via the admin API. The number of partitions can be specified when creating the topic.
Routing modes
When publishing to partitioned topics, you must specify a routing mode. The routing mode determines which partition—that is, which internal topic—each message should be published to.
There are three routing modes available by default:
Mode | Description | Ordering guarantee |
---|---|---|
Key hash | If a key property has been specified on the message, the partitioned producer will hash the key and assign it to a particular partition. | Per-key-bucket ordering |
Single default partition | If no key is provided, each producer’s message will be routed to a dedicated partition, initially random selected | Per-producer ordering |
Round robin distribution | If no key is provided, all messages will be routed to different partitions in round-robin fashion to achieve maximum throughput. | None |
In addition to these default modes, you can also create a custom routing mode if you’re using the Java client by implementing the MessageRouter
interface.