Advanced features
You can use the following advanced features with transactions in Pulsar.
Ack batch messages
If you want to acknowledge batch messages with transactions, set acknowledgmentAtBatchIndexLevelEnabled
to true
in the broker.conf
or standalone.conf
file.
acknowledgmentAtBatchIndexLevelEnabled=true
This example enables batch messages ack in transactions in the consumer builder.
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(transferTopic)
.subscriptionName("transaction-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true) // enable batch index acknowledgment
.subscribe();
Enable authentication
If you want to enable authentication with transactions, follow the steps below.
Grant "consume" permission to the
persistent://pulsar/system/transaction_coordinator_assign
topic.Configure authentication in a Pulsar client.
Select transaction isolation level
To enhance the flexibility of Pulsar transactions, they support two distinct isolation levels:
READ_COMMITTED
(default): The consumer can only consume all transactional messages that have been committed.READ_UNCOMMITTED
: The consumer can consume all messages, even transactional messages that have been aborted.
For different scenarios, they use different subscriptions and choose different isolation levels. One scenario might require transactions, while another might not. In general, not all subscriptions of the same topic require transaction guarantees. Some want low latency without the exact-once semantic guarantee (like a real-time monitoring system), and some must require the exactly-once guarantee (e.g., business processing systems). Users can freely choose different isolation levels according to different scenarios.
Note that this is a subscription-level configuration, and all consumers under the same subscription must be configured with the same isolation level.
In this example, the consumer builder uses the READ_UNCOMMITTED
isolation level.
Consumer<String> consumer = client
.newConsumer(Schema.STRING)
.topic("persistent://my-tenant/my-namespace/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_UNCOMMITTED) // Adding the isolation level configuration
.subscribe();
Guarantee exactly-once semantics
If you want to guarantee exactly-once semantics with transactions, you can enable message deduplication at the broker, namespace, or topic level.
Related topics
- To get up quickly, see Pulsar transactions - Get started.