Skip to main content

Manage topics

Important

This page only shows some frequently used operations.

  • For the latest and complete information about Pulsar admin, including commands, flags, descriptions, and more, see Pulsar admin doc.

  • For the latest and complete information about REST API, including parameters, responses, samples, and more, see REST API doc.

  • For the latest and complete information about Java admin API, including classes, methods, descriptions, and more, see Java admin API doc.

Pulsar has persistent and non-persistent topics. Persistent topic is a logical endpoint for publishing and consuming messages. The topic name structure for persistent topics is:


persistent://tenant/namespace/topic

Non-persistent topics are used in applications that only consume real-time published messages and do not need persistent guarantee. In this way, it reduces message-publish latency by removing overhead of persisting messages. The topic name structure for non-persistent topics is:


non-persistent://tenant/namespace/topic

Manage topic resources​

Whether it is persistent or non-persistent topic, you can obtain the topic resources through pulsar-admin tool, REST API and Java.

note

In REST API, :schema stands for persistent or non-persistent. :tenant, :namespace, :x are variables, replace them with the real tenant, namespace, and x names when using them. Take GET /admin/v2/:schema/:tenant/:namespace/getList as an example, to get the list of persistent topics in REST API, use https://pulsar.apache.org/admin/v2/persistent/my-tenant/my-namespace. To get the list of non-persistent topics in REST API, use https://pulsar.apache.org/admin/v2/non-persistent/my-tenant/my-namespace.

List of topics​

You can get the list of topics under a given namespace in the following ways.


$ pulsar-admin topics list \
my-tenant/my-namespace

Grant permission​

You can grant permissions on a client role to perform specific actions on a given topic in the following ways.


$ pulsar-admin topics grant-permission \
--actions produce,consume --role application1 \
persistent://test-tenant/ns1/tp1 \

Get permission​

You can fetch permission in the following ways.


$ pulsar-admin topics permissions \
persistent://test-tenant/ns1/tp1 \

{
"application1": [
"consume",
"produce"
]
}

Revoke permission​

You can revoke a permission granted on a client role in the following ways.


$ pulsar-admin topics revoke-permission \
--role application1 \
persistent://test-tenant/ns1/tp1 \

{
"application1": [
"consume",
"produce"
]
}

Delete topic​

You can delete a topic in the following ways. You cannot delete a topic if any active subscription or producers is connected to the topic.


$ pulsar-admin topics delete \
persistent://test-tenant/ns1/tp1 \

Unload topic​

You can unload a topic in the following ways.


$ pulsar-admin topics unload \
persistent://test-tenant/ns1/tp1 \

Get stats​

You can check the following statistics of a given non-partitioned topic.

  • msgRateIn: The sum of all local and replication publishers' publish rates (msg/s).

  • msgThroughputIn: The sum of all local and replication publishers' publish rates (bytes/s).

  • msgRateOut: The sum of all local and replication consumers' dispatch rates(msg/s).

  • msgThroughputOut: The sum of all local and replication consumers' dispatch rates (bytes/s).

  • averageMsgSize: The average size (in bytes) of messages published within the last interval.

  • storageSize: The sum of the ledgers' storage size for this topic. The space used to store the messages for the topic.

  • bytesInCounter: Total bytes published to the topic.

  • msgInCounter: Total messages published to the topic.

  • bytesOutCounter: Total bytes delivered to consumers.

  • msgOutCounter: Total messages delivered to consumers.

  • msgChunkPublished: Topic has chunked message published on it.

  • backlogSize: Estimated total unconsumed or backlog size (in bytes).

  • offloadedStorageSize: Space used to store the offloaded messages for the topic (in bytes).

  • waitingPublishers: The number of publishers waiting in a queue in exclusive access mode.

  • deduplicationStatus: The status of message deduplication for the topic.

  • topicEpoch: The topic epoch or empty if not set.

  • nonContiguousDeletedMessagesRanges: The number of non-contiguous deleted messages ranges.

  • nonContiguousDeletedMessagesRangesSerializedSize: The serialized size of non-contiguous deleted messages ranges.

  • publishers: The list of all local publishers into the topic. The list ranges from zero to thousands.

    • accessMode: The type of access to the topic that the producer requires.

    • msgRateIn: The total rate of messages (msg/s) published by this publisher.

    • msgThroughputIn: The total throughput (bytes/s) of the messages published by this publisher.

    • averageMsgSize: The average message size in bytes from this publisher within the last interval.

    • chunkedMessageRate: The total rate of chunked messages published by this publisher.

    • producerId: The internal identifier for this producer on this topic.

    • producerName: The internal identifier for this producer, generated by the client library.

    • address: The IP address and source port for the connection of this producer.

    • connectedSince: The timestamp when this producer is created or reconnected last time.

    • clientVersion: The client library version of this producer.

    • metadata: Metadata (key/value strings) associated with this publisher.

  • subscriptions: The list of all local subscriptions to the topic.

    • my-subscription: The name of this subscription. It is defined by the client.

      • msgRateOut: The total rate of messages (msg/s) delivered on this subscription.

      • msgThroughputOut: The total throughput (bytes/s) delivered on this subscription.

      • msgBacklog: The number of messages in the subscription backlog.

      • type: The subscription type.

      • msgRateExpired: The rate at which messages were discarded instead of dispatched from this subscription due to TTL.

      • lastExpireTimestamp: The timestamp of the last message expire execution.

      • lastConsumedFlowTimestamp: The timestamp of the last flow command received.

      • lastConsumedTimestamp: The latest timestamp of all the consumed timestamp of the consumers.

      • lastAckedTimestamp: The latest timestamp of all the acked timestamp of the consumers.

      • bytesOutCounter: Total bytes delivered to consumer.

      • msgOutCounter: Total messages delivered to consumer.

      • msgRateRedeliver: Total rate of messages redelivered on this subscription (msg/s).

      • chunkedMessageRate: Chunked message dispatch rate.

      • backlogSize: Size of backlog for this subscription (in bytes).

      • msgBacklogNoDelayed: Number of messages in the subscription backlog that do not contain the delay messages.

      • blockedSubscriptionOnUnackedMsgs: Flag to verify if a subscription is blocked due to reaching threshold of unacked messages.

      • msgDelayed: Number of delayed messages currently being tracked.

      • unackedMessages: Number of unacknowledged messages for the subscription, where an unacknowledged message is one that has been sent to a consumer but not yet acknowledged. This field is only meaningful when using a subscription that tracks individual message acknowledgement.

      • activeConsumerName: The name of the consumer that is active for single active consumer subscriptions. For example, failover or exclusive.

      • totalMsgExpired: Total messages expired on this subscription.

      • lastMarkDeleteAdvancedTimestamp: Last MarkDelete position advanced timestamp.

      • durable: Whether the subscription is durable or ephemeral (for example, from a reader).

      • replicated: Mark that the subscription state is kept in sync across different regions.

      • allowOutOfOrderDelivery: Whether out of order delivery is allowed on the Key_Shared subscription.

      • keySharedMode: Whether the Key_Shared subscription mode is AUTO_SPLIT or STICKY.

      • consumersAfterMarkDeletePosition: This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription.

      • nonContiguousDeletedMessagesRanges: The number of non-contiguous deleted messages ranges.

      • nonContiguousDeletedMessagesRangesSerializedSize: The serialized size of non-contiguous deleted messages ranges.

      • consumers: The list of connected consumers for this subscription.

        • msgRateOut: The total rate of messages (msg/s) delivered to the consumer.

        • msgThroughputOut: The total throughput (bytes/s) delivered to the consumer.

        • consumerName: The internal identifier for this consumer, generated by the client library.

        • availablePermits: The number of messages that the consumer has space for in the client library's listen queue. 0 means the client library's queue is full and receive() isn't being called. A non-zero value means this consumer is ready for dispatched messages.

        • unackedMessages: The number of unacknowledged messages for the consumer, where an unacknowledged message is one that has been sent to the consumer but not yet acknowledged. This field is only meaningful when using a subscription that tracks individual message acknowledgement.

        • blockedConsumerOnUnackedMsgs: The flag used to verify if the consumer is blocked due to reaching threshold of the unacknowledged messages.

        • lastConsumedTimestamp: The timestamp when the consumer reads a message the last time.

        • lastAckedTimestamp: The timestamp when the consumer acknowledges a message the last time.

        • address: The IP address and source port for the connection of this consumer.

        • connectedSince: The timestamp when this consumer is created or reconnected last time.

        • clientVersion: The client library version of this consumer.

        • bytesOutCounter: Total bytes delivered to consumer.

        • msgOutCounter: Total messages delivered to consumer.

        • msgRateRedeliver: Total rate of messages redelivered by this consumer (msg/s).

        • chunkedMessageRate: The total rate of chunked messages delivered to this consumer.

        • avgMessagesPerEntry: Number of average messages per entry for the consumer consumed.

        • readPositionWhenJoining: The read position of the cursor when the consumer joining.

        • keyHashRanges: Hash ranges assigned to this consumer if is Key_Shared sub mode.

        • metadata: Metadata (key/value strings) associated with this consumer.

  • replication: This section gives the stats for cross-colo replication of this topic

    • msgRateIn: The total rate (msg/s) of messages received from the remote cluster.

    • msgThroughputIn: The total throughput (bytes/s) received from the remote cluster.

    • msgRateOut: The total rate of messages (msg/s) delivered to the replication-subscriber.

    • msgThroughputOut: The total throughput (bytes/s) delivered to the replication-subscriber.

    • msgRateExpired: The total rate of messages (msg/s) expired.

    • replicationBacklog: The number of messages pending to be replicated to remote cluster.

    • connected: Whether the outbound replicator is connected.

    • replicationDelayInSeconds: How long the oldest message has been waiting to be sent through the connection, if connected is true.

    • inboundConnection: The IP and port of the broker in the remote cluster's publisher connection to this broker.

    • inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. If there are no local publishers connected, this connection is automatically closed after a minute.

    • outboundConnection: The address of the outbound replication connection.

    • outboundConnectedSince: The timestamp of establishing outbound connection.

The following is an example of a topic status.


{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 504,
"msgInCounter" : 9,
"bytesOutCounter" : 2296,
"msgOutCounter" : 41,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 504,
"backlogSize" : 0,
"offloadedStorageSize" : 0,
"publishers" : [ {
"accessMode" : "Shared",
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"chunkedMessageRate" : 0.0,
"producerId" : 0,
"metadata" : { },
"address" : "/127.0.0.1:65402",
"connectedSince" : "2021-06-09T17:22:55.913+08:00",
"clientVersion" : "2.9.0-SNAPSHOT",
"producerName" : "standalone-1-0"
} ],
"waitingPublishers" : 0,
"subscriptions" : {
"sub-demo" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 2296,
"msgOutCounter" : 41,
"msgRateRedeliver" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 0,
"backlogSize" : 0,
"msgBacklogNoDelayed" : 0,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Exclusive",
"activeConsumerName" : "20b81",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1623230565356,
"lastConsumedTimestamp" : 1623230583946,
"lastAckedTimestamp" : 1623230584033,
"lastMarkDeleteAdvancedTimestamp" : 1623230584033,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 2296,
"msgOutCounter" : 41,
"msgRateRedeliver" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "20b81",
"availablePermits" : 959,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 314,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1623230584033,
"lastConsumedTimestamp" : 1623230583946,
"metadata" : { },
"address" : "/127.0.0.1:65172",
"connectedSince" : "2021-06-09T17:22:45.353+08:00",
"clientVersion" : "2.9.0-SNAPSHOT"
} ],
"allowOutOfOrderDelivery": false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0
}

To get the status of a topic, you can use the following ways.


$ pulsar-admin topics stats \
persistent://test-tenant/ns1/tp1 \

Get internal stats​

You can get the detailed statistics of a topic.

  • entriesAddedCounter: Messages published since this broker loaded this topic.

  • numberOfEntries: The total number of messages being tracked.

  • totalSize: The total storage size in bytes of all messages.

  • currentLedgerEntries: The count of messages written to the ledger that is currently open for writing.

  • currentLedgerSize: The size in bytes of messages written to the ledger that is currently open for writing.

  • lastLedgerCreatedTimestamp: The time when the last ledger is created.

  • lastLedgerCreationFailureTimestamp: The time when the last ledger failed.

  • waitingCursorsCount: The number of cursors that are "caught up" and waiting for a new message to be published.

  • pendingAddEntriesCount: The number of messages that complete (asynchronous) write requests.

  • lastConfirmedEntry: The ledgerid:entryid of the last message that is written successfully. If the entryid is -1, then the ledger is open, yet no entries are written.

  • state: The state of this ledger for writing. The state LedgerOpened means that a ledger is open for saving published messages.

  • ledgers: The ordered list of all ledgers for this topic holding messages.

    • ledgerId: The ID of this ledger.

    • entries: The total number of entries that belong to this ledger.

    • size: The size of messages written to this ledger (in bytes).

    • offloaded: Whether this ledger is offloaded.

    • metadata: The ledger metadata.

  • schemaLedgers: The ordered list of all ledgers for this topic schema.

    • ledgerId: The ID of this ledger.

    • entries: The total number of entries that belong to this ledger.

    • size: The size of messages written to this ledger (in bytes).

    • offloaded: Whether this ledger is offloaded.

    • metadata: The ledger metadata.

  • compactedLedger: The ledgers holding un-acked messages after topic compaction.

    • ledgerId: The ID of this ledger.

    • entries: The total number of entries that belong to this ledger.

    • size: The size of messages written to this ledger (in bytes).

    • offloaded: Whether this ledger is offloaded. The value is false for the compacted topic ledger.

  • cursors: The list of all cursors on this topic. Each subscription in the topic stats has a cursor.

    • markDeletePosition: All messages before the markDeletePosition are acknowledged by the subscriber.

    • readPosition: The latest position of subscriber for reading message.

    • waitingReadOp: This is true when the subscription has read the latest message published to the topic and is waiting for new messages to be published.

    • pendingReadOps: The counter for how many outstanding read requests to the BookKeepers in progress.

    • messagesConsumedCounter: The number of messages this cursor has acked since this broker loaded this topic.

    • cursorLedger: The ledger being used to persistently store the current markDeletePosition.

    • cursorLedgerLastEntry: The last entryid used to persistently store the current markDeletePosition.

    • individuallyDeletedMessages: If acknowledges are being done out of order, the ranges of messages acknowledged between the markDeletePosition and the read-position shows.

    • lastLedgerSwitchTimestamp: The last time the cursor ledger is rolled over.

    • state: The state of the cursor ledger: Open means you have a cursor ledger for saving updates of the markDeletePosition.

The following is an example of the detailed statistics of a topic.


{
"entriesAddedCounter":0,
"numberOfEntries":0,
"totalSize":0,
"currentLedgerEntries":0,
"currentLedgerSize":0,
"lastLedgerCreatedTimestamp":"2021-01-22T21:12:14.868+08:00",
"lastLedgerCreationFailureTimestamp":null,
"waitingCursorsCount":0,
"pendingAddEntriesCount":0,
"lastConfirmedEntry":"3:-1",
"state":"LedgerOpened",
"ledgers":[
{
"ledgerId":3,
"entries":0,
"size":0,
"offloaded":false,
"metadata":null
}
],
"cursors":{
"test":{
"markDeletePosition":"3:-1",
"readPosition":"3:-1",
"waitingReadOp":false,
"pendingReadOps":0,
"messagesConsumedCounter":0,
"cursorLedger":4,
"cursorLedgerLastEntry":1,
"individuallyDeletedMessages":"[]",
"lastLedgerSwitchTimestamp":"2021-01-22T21:12:14.966+08:00",
"state":"Open",
"numberOfEntriesSinceFirstNotAckedMessage":0,
"totalNonContiguousDeletedMessagesRange":0,
"properties":{

}
}
},
"schemaLedgers":[
{
"ledgerId":1,
"entries":11,
"size":10,
"offloaded":false,
"metadata":null
}
],
"compactedLedger":{
"ledgerId":-1,
"entries":-1,
"size":-1,
"offloaded":false,
"metadata":null
}
}

To get the internal status of a topic, you can use the following ways.


$ pulsar-admin topics stats-internal \
persistent://test-tenant/ns1/tp1 \

Peek messages​

You can peek a number of messages for a specific subscription of a given topic in the following ways.


$ pulsar-admin topics peek-messages \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1 \

Message ID: 315674752:0
Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" }
msg-payload

Get message by ID​

You can fetch the message with the given ledger ID and entry ID in the following ways.


$ ./bin/pulsar-admin topics get-message-by-id \
persistent://public/default/my-topic \
-l 10 -e 0

Examine messages​

You can examine a specific message on a topic by position relative to the earliest or the latest message.


./bin/pulsar-admin topics examine-messages \
persistent://public/default/my-topic \
-i latest -m 1

Get message ID​

You can get message ID published at or just after the given datetime.


./bin/pulsar-admin topics get-message-id \
persistent://public/default/my-topic \
-d 2021-06-28T19:01:17Z

Skip messages​

You can skip a number of messages for a specific subscription of a given topic in the following ways.


$ pulsar-admin topics skip \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1 \

Skip all messages​

You can skip all the old messages for a specific subscription of a given topic.


$ pulsar-admin topics skip-all \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1 \

Reset cursor​

You can reset a subscription cursor position back to the position which is recorded X minutes before. It essentially calculates time and position of cursor at X minutes before and resets it at that position. You can reset the cursor in the following ways.


$ pulsar-admin topics reset-cursor \
--subscription my-subscription --time 10 \
persistent://test-tenant/ns1/tp1 \

Look up topic's owner broker​

You can locate the owner broker of the given topic in the following ways.


$ pulsar-admin topics lookup \
persistent://test-tenant/ns1/tp1 \

"pulsar://broker1.org.com:4480"

Get bundle​

You can get the range of the bundle that the given topic belongs to in the following ways.


$ pulsar-admin topics bundle-range \
persistent://test-tenant/ns1/tp1 \

"0x00000000_0xffffffff"

Get subscriptions​

You can check all subscription names for a given topic in the following ways.


$ pulsar-admin topics subscriptions \
persistent://test-tenant/ns1/tp1 \

my-subscription

Unsubscribe​

When a subscription does not process messages any more, you can unsubscribe it in the following ways.


$ pulsar-admin topics unsubscribe \
--subscription my-subscription \
persistent://test-tenant/ns1/tp1 \

Last Message Id​

You can get the last committed message ID for a persistent topic. It is available since 2.3.0 release.


pulsar-admin topics last-message-id topic-name

Configure deduplication snapshot interval​

Get deduplication snapshot interval​

To get the topic-level deduplication snapshot interval, use one of the following methods.


pulsar-admin topics get-deduplication-snapshot-interval options

Set deduplication snapshot interval​

To set the topic-level deduplication snapshot interval, use one of the following methods.

Prerequisite brokerDeduplicationEnabled must be set to true.


pulsar-admin topics set-deduplication-snapshot-interval options

Remove deduplication snapshot interval​

To remove the topic-level deduplication snapshot interval, use one of the following methods.


pulsar-admin topics remove-deduplication-snapshot-interval options

Configure inactive topic policies​

Get inactive topic policies​

To get the topic-level inactive topic policies, use one of the following methods.


pulsar-admin topics get-inactive-topic-policies options

Set inactive topic policies​

To set the topic-level inactive topic policies, use one of the following methods.


pulsar-admin topics set-inactive-topic-policies options

Remove inactive topic policies​

To remove the topic-level inactive topic policies, use one of the following methods.


pulsar-admin topics remove-inactive-topic-policies options

Configure offload policies​

Get offload policies​

To get the topic-level offload policies, use one of the following methods.


pulsar-admin topics get-offload-policies options

Set offload policies​

To set the topic-level offload policies, use one of the following methods.


pulsar-admin topics set-offload-policies options

Remove offload policies​

To remove the topic-level offload policies, use one of the following methods.


pulsar-admin topics remove-offload-policies options

Manage non-partitioned topics​

You can use Pulsar admin API to create, delete and check status of non-partitioned topics.

Create​

Non-partitioned topics must be explicitly created. When creating a new non-partitioned topic, you need to provide a name for the topic.

By default, 60 seconds after creation, topics are considered inactive and deleted automatically to avoid generating trash data. To disable this feature, set brokerDeleteInactiveTopicsEnabled to false. To change the frequency of checking inactive topics, set brokerDeleteInactiveTopicsFrequencySeconds to a specific value.

For more information about the two parameters, see here.

You can create non-partitioned topics in the following ways.

When you create non-partitioned topics with the create command, you need to specify the topic name as an argument.


$ bin/pulsar-admin topics create \
persistent://my-tenant/my-namespace/my-topic

note

When you create a non-partitioned topic with the suffix '-partition-' followed by numeric value like 'xyz-topic-partition-x' for the topic name, if a partitioned topic with same suffix 'xyz-topic-partition-y' exists, then the numeric value(x) for the non-partitioned topic must be larger than the number of partitions(y) of the partitioned topic. Otherwise, you cannot create such a non-partitioned topic.

Delete​

You can delete non-partitioned topics in the following ways.


$ bin/pulsar-admin topics delete \
persistent://my-tenant/my-namespace/my-topic

List​

You can get the list of topics under a given namespace in the following ways.


$ pulsar-admin topics list tenant/namespace
persistent://tenant/namespace/topic1
persistent://tenant/namespace/topic2

Stats​

You can check the current statistics of a given topic. The following is an example. For description of each stats, refer to get stats.


{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers": []
}
},
"replication": {}
}

You can check the current statistics of a given topic and its connected producers and consumers in the following ways.


$ pulsar-admin topics stats \
persistent://test-tenant/namespace/topic \
--get-precise-backlog

Internal stats​

You can check the detailed statistics of a topic. The following is an example. For the description of each internal topic stats, see Pulsar statistics.

{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}

You can get the internal stats for the partitioned topic in the following ways.

$ pulsar-admin topics stats-internal \
persistent://test-tenant/namespace/topic

Manage partitioned topics​

You can use Pulsar admin API to create, update, delete and check status of partitioned topics.

Create​

Partitioned topics must be explicitly created. When creating a new partitioned topic, you need to provide a name and the number of partitions for the topic.

By default, 60 seconds after creation, topics are considered inactive and deleted automatically to avoid generating trash data. To disable this feature, set brokerDeleteInactiveTopicsEnabled to false. To change the frequency of checking inactive topics, set brokerDeleteInactiveTopicsFrequencySeconds to a specific value.

For more information about the two parameters, see here.

You can create partitioned topics in the following ways.

When you create partitioned topics with the create-partitioned-topic command, you need to specify the topic name as an argument and the number of partitions using the -p or --partitions flag.


$ bin/pulsar-admin topics create-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic \
--partitions 4

note

If a non-partitioned topic with the suffix '-partition-' followed by a numeric value like 'xyz-topic-partition-10', you can not create a partitioned topic with name 'xyz-topic', because the partitions of the partitioned topic could override the existing non-partitioned topic. To create such partitioned topic, you have to delete that non-partitioned topic first.

Create missed partitions​

When topic auto-creation is disabled, and you have a partitioned topic without any partitions, you can use the create-missed-partitions command to create partitions for the topic.

You can create missed partitions with the create-missed-partitions command and specify the topic name as an argument.


$ bin/pulsar-admin topics create-missed-partitions \
persistent://my-tenant/my-namespace/my-topic \

Get metadata​

Partitioned topics are associated with metadata, you can view it as a JSON object. The following metadata field is available.

FieldDescription
partitionsThe number of partitions into which the topic is divided.

You can check the number of partitions in a partitioned topic with the get-partitioned-topic-metadata subcommand.


$ pulsar-admin topics get-partitioned-topic-metadata \
persistent://my-tenant/my-namespace/my-topic
{
"partitions": 4
}

Update​

You can update the number of partitions for an existing partitioned topic if the topic is non-global. However, you can only add the partition number. Decrementing the number of partitions would delete the topic, which is not supported in Pulsar.

Producers and consumers can find the newly created partitions automatically.

You can update partitioned topics with the update-partitioned-topic command.


$ pulsar-admin topics update-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic \
--partitions 8

Delete​

You can delete partitioned topics with the delete-partitioned-topic command, REST API and Java.


$ bin/pulsar-admin topics delete-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic

List​

You can get the list of topics under a given namespace in the following ways.


$ pulsar-admin topics list tenant/namespace
persistent://tenant/namespace/topic1
persistent://tenant/namespace/topic2

Stats​

You can check the current statistics of a given partitioned topic. The following is an example. For description of each stats, refer to get stats.

Note that in the subscription JSON object, chuckedMessageRate is deprecated. Please use chunkedMessageRate. Both will be sent in the JSON for now.


{
"msgRateIn" : 999.992947159793,
"msgThroughputIn" : 1070918.4635439808,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 270318763,
"msgInCounter" : 252489,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"averageMsgSize" : 1070.926056966454,
"msgChunkPublished" : false,
"storageSize" : 270316646,
"backlogSize" : 200921133,
"publishers" : [ {
"msgRateIn" : 999.992947159793,
"msgThroughputIn" : 1070918.4635439808,
"averageMsgSize" : 1070.3333333333333,
"chunkedMessageRate" : 0.0,
"producerId" : 0
} ],
"subscriptions" : {
"test" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0,
"chunkedMessageRate" : 0,
"msgBacklog" : 144318,
"msgBacklogNoDelayed" : 144318,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"consumers" : [ ],
"isDurable" : true,
"isReplicated" : false
}
},
"replication" : { },
"metadata" : {
"partitions" : 3
},
"partitions" : { }
}

You can check the current statistics of a given partitioned topic and its connected producers and consumers in the following ways.


$ pulsar-admin topics partitioned-stats \
persistent://test-tenant/namespace/topic \
--per-partition

Internal stats​

You can check the detailed statistics of a partitioned topic. The following is an example. For description of each stats, refer to get internal stats.


{
"entriesAddedCounter": 20449518,
"numberOfEntries": 3233,
"totalSize": 331482,
"currentLedgerEntries": 3233,
"currentLedgerSize": 331482,
"lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 1,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "324711539:3232",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
}
],
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"readPosition": "324711539:3233",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 20449501,
"cursorLedger": 324702104,
"cursorLedgerLastEntry": 21,
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
"lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
"state": "Open"
}
}
}

You can get the internal stats for the partitioned topic in the following ways.

$ pulsar-admin topics partitioned-stats-internal \
persistent://test-tenant/namespace/topic

Get backlog size​

You can get backlog size of a single topic partition or a nonpartitioned topic given a message ID (in bytes).


$ pulsar-admin topics get-backlog-size \
-m 1:1 \
persistent://test-tenant/ns1/tp1-partition-0 \

Publish to partitioned topics​

By default, Pulsar topics are served by a single broker, which limits the maximum throughput of a topic. Partitioned topics can span multiple brokers and thus allow for higher throughput.

You can publish to partitioned topics using Pulsar client libraries. When publishing to partitioned topics, you must specify a routing mode. If you do not specify any routing mode when you create a new producer, the round robin routing mode is used.

Routing mode​

You can specify the routing mode in the ProducerConfiguration object that you use to configure your producer. The routing mode determines which partition(internal topic) that each message should be published to.

The following MessageRoutingMode options are available.

ModeDescription
RoundRobinPartitionIf no key is provided, the producer publishes messages across all partitions in round-robin policy to achieve the maximum throughput. Round-robin is not done per individual message, round-robin is set to the same boundary of batching delay to ensure that batching is effective. If a key is specified on the message, the partitioned producer hashes the key and assigns message to a particular partition. This is the default mode.
SinglePartitionIf no key is provided, the producer picks a single partition randomly and publishes all messages into that partition. If a key is specified on the message, the partitioned producer hashes the key and assigns message to a particular partition.
CustomPartitionUse custom message router implementation that is called to determine the partition for a particular message. You can create a custom routing mode by using the Java client and implementing the MessageRouter interface.

The following is an example:


String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-tenant/my-namespace/my-topic";

PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("Partitioned topic message".getBytes());

Custom message router​

To use a custom message router, you need to provide an implementation of the MessageRouter interface, which has just one choosePartition method:


public interface MessageRouter extends Serializable {
int choosePartition(Message msg);
}

The following router routes every message to partition 10:


public class AlwaysTenRouter implements MessageRouter {
public int choosePartition(Message msg) {
return 10;
}
}

With that implementation, you can send


String pulsarBrokerRootUrl = "pulsar://localhost:6650";
String topic = "persistent://my-tenant/my-cluster-my-namespace/my-topic";

PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.messageRouter(new AlwaysTenRouter())
.create();
producer.send("Partitioned topic message".getBytes());

How to choose partitions when using a key​

If a message has a key, it supersedes the round robin routing policy. The following example illustrates how to choose the partition when using a key.


// If the message has a key, it supersedes the round robin routing policy
if (msg.hasKey()) {
return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
}

if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary.
long currentMs = clock.millis();
return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions());
} else {
return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());
}