Work with consumer
After setting up your clients, you can explore more to start working with consumers.
Subscribe to topics
Pulsar has various subscription types to match different scenarios. A topic can have multiple subscriptions with different subscription types. However, a subscription can only have one subscription type at a time.
A subscription is identical to the subscription name; a subscription name can specify only one subscription type at a time. To change the subscription type, you should first stop all consumers of this subscription.
Different subscription types have different message distribution types. This section describes the differences between subscription types and how to use them.
To better describe their differences, assume you have a topic named "my-topic", and the producer has published 10 messages.
- Java
- Python
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.enableBatching(false)
.create();
// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
producer.newMessage().key("key-1").value("message-1-1").send();
producer.newMessage().key("key-1").value("message-1-2").send();
producer.newMessage().key("key-1").value("message-1-3").send();
producer.newMessage().key("key-2").value("message-2-1").send();
producer.newMessage().key("key-2").value("message-2-2").send();
producer.newMessage().key("key-2").value("message-2-3").send();
producer.newMessage().key("key-3").value("message-3-1").send();
producer.newMessage().key("key-3").value("message-3-2").send();
producer.newMessage().key("key-4").value("message-4-1").send();
producer.newMessage().key("key-4").value("message-4-2").send();
producer = client.create_producer('my-topic', batching_enabled=False)
# 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
producer.send(b'message-1-1', partition_key='key-1')
producer.send(b'message-1-2', partition_key='key-1')
producer.send(b'message-1-3', partition_key='key-1')
producer.send(b'message-2-1', partition_key='key-2')
producer.send(b'message-2-2', partition_key='key-2')
producer.send(b'message-2-3', partition_key='key-2')
producer.send(b'message-3-1', partition_key='key-3')
producer.send(b'message-3-2', partition_key='key-3')
producer.send(b'message-4-1', partition_key='key-4')
producer.send(b'message-4-2', partition_key='key-4')
Exclusive
Create a new consumer and subscribe with the Exclusive subscription type.
- Java
- Python
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe()
consumer = client.subscribe('my-topic', 'my-subscription',
consumer_type=pulsar.ConsumerType.Exclusive)
Only the first consumer is allowed to the subscription, and other consumers receive an error. The first consumer receives all 10 messages, and the consuming order is the same as the producing order.
If the topic is partitioned, the first consumer subscribes to all partitioned topics, and other consumers are not assigned with partitions and receive an error.
Failover
Create new consumers and subscribe with the Failover subscription type.
- Java
- Python
Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.subscribe()
//conumser1 is the active consumer, consumer2 is the standby consumer.
//consumer1 receives 5 messages and then crashes, consumer2 takes over as an active consumer.
consumer1 = client.subscribe('my-topic', 'my-subscription',
consumer_type=pulsar.ConsumerType.Failover)
consumer2 = client.subscribe('my-topic', 'my-subscription',
consumer_type=pulsar.ConsumerType.Failover)
Multiple consumers can attach to the same subscription, yet only the first consumer is active, and others are standby. When the active consumer is disconnected, messages will be dispatched to one of standby consumers, and the standby consumer then becomes the active consumer.
If the first active consumer is disconnected after receiving 5 messages, the standby consumer becomes active consumer. Consumer1 will receive:
("key-1", "message-1-1")
("key-1", "message-1-2")
("key-1", "message-1-3")
("key-2", "message-2-1")
("key-2", "message-2-2")
consumer2 will receive:
("key-2", "message-2-3")
("key-3", "message-3-1")
("key-3", "message-3-2")
("key-4", "message-4-1")
("key-4", "message-4-2")
If a topic is a partitioned topic, each partition has only one active consumer, messages of one partition are distributed to only one consumer, and messages of multiple partitions are distributed to multiple consumers.
Shared
Create new consumers and subscribe with Shared subscription type.
- Java
- Python
Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe()
//Both consumer1 and consumer2 are active consumers.
consumer1 = client.subscribe('my-topic', 'my-subscription',
consumer_type=pulsar.ConsumerType.Shared)
consumer2 = client.subscribe('my-topic', 'my-subscription',
consumer_type=pulsar.ConsumerType.Shared)
In Shared subscription type, multiple consumers can attach to the same subscription and messages are delivered in a round-robin distribution across consumers.
If a broker dispatches only one message at a time, consumer1 receives the following information.
("key-1", "message-1-1")
("key-1", "message-1-3")
("key-2", "message-2-2")
("key-3", "message-3-1")
("key-4", "message-4-1")
consumer2 receives the following information.
("key-1", "message-1-2")
("key-2", "message-2-1")
("key-2", "message-2-3")
("key-3", "message-3-2")
("key-4", "message-4-2")
The Shared subscription is different from the Exclusive and Failover subscription types. Shared subscription has better flexibility, but cannot provide an ordering guarantee.
Key_shared
This is a new subscription type since 2.4.0 release. Create new consumers and subscribe with Key_Shared subscription type.
- Java
- Python
Consumer consumer1 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
//Both consumer1 and consumer2 are active consumers.
consumer1 = client.subscribe('my-topic', 'my-subscription',
consumer_type=pulsar.ConsumerType.KeyShared)
consumer2 = client.subscribe('my-topic', 'my-subscription',
consumer_type=pulsar.ConsumerType.KeyShared)
Just like in the Shared subscription, all consumers in the Key_Shared subscription type can attach to the same subscription. But the Key_Shared subscription type is different from the Shared subscription. In the Key_Shared subscription type, messages with the same key are delivered to only one consumer in order. The possible distribution of messages between different consumers (by default we do not know in advance which keys will be assigned to a consumer, but a key will only be assigned to a consumer at the same time).
consumer1 receives the following information.
("key-1", "message-1-1")
("key-1", "message-1-2")
("key-1", "message-1-3")
("key-3", "message-3-1")
("key-3", "message-3-2")
consumer2 receives the following information.
("key-2", "message-2-1")
("key-2", "message-2-2")
("key-2", "message-2-3")
("key-4", "message-4-1")
("key-4", "message-4-2")
If batching is enabled at the producer side, messages with different keys are added to a batch by default. The broker will dispatch the batch to the consumer, so the default batch mechanism may break the Key_Shared subscription guaranteed message distribution semantics. The producer needs to use the KeyBasedBatcher.
- Java
- Python
Producer producer = client.newProducer()
.topic("my-topic")
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
producer = client.create_producer('my-topic',
batching_type=pulsar.BatchingType.KeyBased)
Or the producer can disable batching.
- Java
- Python
Producer producer = client.newProducer()
.topic("my-topic")
.enableBatching(false)
.create();
producer = client.create_producer('my-topic', batching_enabled=False)
If the message key is not specified, messages without keys are dispatched to one consumer in order by default.
Subscribe to multi-topics
In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously using multi-topic subscriptions. To use multi-topic subscriptions you can supply either a regular expression (regex) or a List of topics. If you select topics via regex, all topics must be within the same Pulsar namespace.
The followings are some examples.
- Java
- Go
- Python
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
.subscriptionName(subscription);
// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("public/default/.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.subscribe();
// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("public/default/foo.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(someTopicsInNamespace)
.subscribe();
In the above example, the consumer subscribes to the persistent topics that can match the topic name pattern. If you want the consumer subscribes to all persistent and non-persistent topics that can match the topic name pattern, set subscriptionTopicsMode to RegexSubscriptionMode.AllTopics.
Pattern pattern = Pattern.compile("public/default/.*");
pulsarClient.newConsumer()
.subscriptionName("my-sub")
.topicsPattern(pattern)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscribe();
By default, the subscriptionTopicsMode of the consumer is PersistentOnly. Available options of subscriptionTopicsMode are PersistentOnly, NonPersistentOnly, and AllTopics.
You can also subscribe to an explicit list of topics (across namespaces if you wish):
List<String> topics = Arrays.asList(
"topic-1",
"topic-2",
"topic-3"
);
Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.subscribe();
// Alternatively:
Consumer multiTopicConsumer = consumerBuilder
.topic(
"topic-1",
"topic-2",
"topic-3"
)
.subscribe();
You can also subscribe to multiple topics asynchronously using the subscribeAsync method rather than the synchronous subscribe method. The following is an example.
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
consumerBuilder
.topics(topics)
.subscribeAsync()
.thenAccept(this::receiveMessageFromConsumer);
private void receiveMessageFromConsumer(Object consumer) {
((Consumer)consumer).receiveAsync().thenAccept(message -> {
// Do something with the received message
receiveMessageFromConsumer(consumer);
});
}
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
topics := []string{"topic-1", "topic-2"}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
// fill `Topics` field will create a multi-topic consumer
Topics: topics,
SubscriptionName: "multi-topic-sub",
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
import re
consumer = client.subscribe(re.compile('persistent://public/default/topic-*'), 'my-subscription')
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except Exception:
# Message failed to be processed
consumer.negative_acknowledge(msg)
client.close()
Unsubscribe from topics
This example shows how a consumer unsubscribes from a topic.
- Java
- C#
- Python
consumer.unsubscribe();
await consumer.Unsubscribe();
consumer.unsubscribe()
A consumer cannot be used and is disposed once the consumer unsubscribes from a topic.
Receive messages
This example shows how a consumer receives messages from a topic.
- Java
- C#
- Python
Message message = consumer.receive();
await foreach (var message in consumer.Messages())
{
Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
}
msg = consumer.receive()