No Matches
pulsar::ProducerConfiguration Class Reference

#include <ProducerConfiguration.h>

Public Types

enum  PartitionsRoutingMode { UseSinglePartition , RoundRobinDistribution , CustomPartition }
enum  HashingScheme { Murmur3_32Hash , BoostHash , JavaStringHash }
enum  BatchingType { DefaultBatching , KeyBasedBatching }
enum  ProducerAccessMode { Shared = 0 , Exclusive = 1 , WaitForExclusive = 2 , ExclusiveWithFencing = 3 }

Public Member Functions

 ProducerConfiguration (const ProducerConfiguration &)
ProducerConfigurationoperator= (const ProducerConfiguration &)
ProducerConfigurationsetProducerName (const std::string &producerName)
const std::string & getProducerName () const
ProducerConfigurationsetSchema (const SchemaInfo &schemaInfo)
const SchemaInfogetSchema () const
ProducerConfigurationsetSendTimeout (int sendTimeoutMs)
int getSendTimeout () const
ProducerConfigurationsetInitialSequenceId (int64_t initialSequenceId)
int64_t getInitialSequenceId () const
ProducerConfigurationsetCompressionType (CompressionType compressionType)
CompressionType getCompressionType () const
ProducerConfigurationsetMaxPendingMessages (int maxPendingMessages)
int getMaxPendingMessages () const
ProducerConfigurationsetMaxPendingMessagesAcrossPartitions (int maxPendingMessagesAcrossPartitions)
int getMaxPendingMessagesAcrossPartitions () const
ProducerConfigurationsetPartitionsRoutingMode (const PartitionsRoutingMode &mode)
PartitionsRoutingMode getPartitionsRoutingMode () const
ProducerConfigurationsetMessageRouter (const MessageRoutingPolicyPtr &router)
const MessageRoutingPolicyPtr & getMessageRouterPtr () const
ProducerConfigurationsetHashingScheme (const HashingScheme &scheme)
HashingScheme getHashingScheme () const
ProducerConfigurationsetLazyStartPartitionedProducers (bool)
bool getLazyStartPartitionedProducers () const
ProducerConfigurationsetBlockIfQueueFull (bool)
bool getBlockIfQueueFull () const
ProducerConfigurationsetBatchingEnabled (const bool &batchingEnabled)
const bool & getBatchingEnabled () const
ProducerConfigurationsetBatchingMaxMessages (const unsigned int &batchingMaxMessages)
const unsigned int & getBatchingMaxMessages () const
ProducerConfigurationsetBatchingMaxAllowedSizeInBytes (const unsigned long &batchingMaxAllowedSizeInBytes)
const unsigned long & getBatchingMaxAllowedSizeInBytes () const
ProducerConfigurationsetBatchingMaxPublishDelayMs (const unsigned long &batchingMaxPublishDelayMs)
const unsigned long & getBatchingMaxPublishDelayMs () const
ProducerConfigurationsetBatchingType (BatchingType batchingType)
BatchingType getBatchingType () const
const CryptoKeyReaderPtr getCryptoKeyReader () const
ProducerConfigurationsetCryptoKeyReader (CryptoKeyReaderPtr cryptoKeyReader)
ProducerCryptoFailureAction getCryptoFailureAction () const
ProducerConfigurationsetCryptoFailureAction (ProducerCryptoFailureAction action)
const std::set< std::string > & getEncryptionKeys () const
bool isEncryptionEnabled () const
ProducerConfigurationaddEncryptionKey (std::string key)
bool hasProperty (const std::string &name) const
const std::string & getProperty (const std::string &name) const
std::map< std::string, std::string > & getProperties () const
ProducerConfigurationsetProperty (const std::string &name, const std::string &value)
ProducerConfigurationsetProperties (const std::map< std::string, std::string > &properties)
ProducerConfigurationsetChunkingEnabled (bool chunkingEnabled)
bool isChunkingEnabled () const
ProducerConfigurationsetAccessMode (const ProducerAccessMode &accessMode)
ProducerAccessMode getAccessMode () const
ProducerConfigurationintercept (const std::vector< ProducerInterceptorPtr > &interceptors)
const std::vector< ProducerInterceptorPtr > & getInterceptors () const


class PulsarWrapper
class ConsumerImpl
class ProducerImpl

Detailed Description

Class that holds the configuration for a producer

Member Enumeration Documentation

◆ BatchingType


Default batching.

incoming single messages: (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)

batched into single batch message: [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]


Key based batching.

incoming single messages: (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)

batched into single batch message: [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]

◆ ProducerAccessMode


By default multiple producers can publish on a topic.


Require exclusive access for producer. Fail immediately if there's already a producer connected.


Producer creation is pending until it can acquire exclusive access.


Acquire exclusive access for the producer. Any existing producer will be removed and invalidated immediately.

Member Function Documentation

◆ addEncryptionKey()

ProducerConfiguration & pulsar::ProducerConfiguration::addEncryptionKey ( std::string key)

Add public encryption key, used by producer to encrypt the data key.

At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are found, a callback getKey(String keyName) is invoked against each key to load the values of the key. Application should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted after compression. If batch messaging is enabled, the batched message is encrypted.

@key the encryption key to add

the ProducerConfiguration self

◆ getAccessMode()

ProducerAccessMode pulsar::ProducerConfiguration::getAccessMode ( ) const

Get the type of access mode that the producer requires on the topic.

◆ getBatchingEnabled()

const bool & pulsar::ProducerConfiguration::getBatchingEnabled ( ) const

Return the flag whether automatic message batching is enabled or not for the producer.

true if automatic message batching is enabled. Otherwise it returns false.
It is enabled by default.

◆ getBatchingMaxAllowedSizeInBytes()

const unsigned long & pulsar::ProducerConfiguration::getBatchingMaxAllowedSizeInBytes ( ) const

The getter associated with setBatchingMaxAllowedSizeInBytes().

◆ getBatchingMaxMessages()

const unsigned int & pulsar::ProducerConfiguration::getBatchingMaxMessages ( ) const

The getter associated with setBatchingMaxMessages().

◆ getBatchingMaxPublishDelayMs()

const unsigned long & pulsar::ProducerConfiguration::getBatchingMaxPublishDelayMs ( ) const

The getter associated with setBatchingMaxPublishDelayMs().

◆ getBatchingType()

BatchingType pulsar::ProducerConfiguration::getBatchingType ( ) const
batching type.
See also

◆ getBlockIfQueueFull()

bool pulsar::ProducerConfiguration::getBlockIfQueueFull ( ) const
whether Producer::send or Producer::sendAsync operations should block when the outgoing message queue is full. (Default: false)

◆ getCompressionType()

CompressionType pulsar::ProducerConfiguration::getCompressionType ( ) const

The getter associated with setCompressionType().

◆ getCryptoFailureAction()

ProducerCryptoFailureAction pulsar::ProducerConfiguration::getCryptoFailureAction ( ) const

The getter associated with setCryptoFailureAction().

◆ getCryptoKeyReader()

const CryptoKeyReaderPtr pulsar::ProducerConfiguration::getCryptoKeyReader ( ) const

The getter associated with setCryptoKeyReader().

◆ getEncryptionKeys()

const std::set< std::string > & pulsar::ProducerConfiguration::getEncryptionKeys ( ) const
all the encryption keys added

◆ getHashingScheme()

HashingScheme pulsar::ProducerConfiguration::getHashingScheme ( ) const

The getter associated with setHashingScheme().

◆ getInitialSequenceId()

int64_t pulsar::ProducerConfiguration::getInitialSequenceId ( ) const

The getter associated with setInitialSequenceId().

◆ getLazyStartPartitionedProducers()

bool pulsar::ProducerConfiguration::getLazyStartPartitionedProducers ( ) const

The getter associated with setLazyStartPartitionedProducers()

◆ getMaxPendingMessages()

int pulsar::ProducerConfiguration::getMaxPendingMessages ( ) const

The getter associated with setMaxPendingMessages().

◆ getMaxPendingMessagesAcrossPartitions()

int pulsar::ProducerConfiguration::getMaxPendingMessagesAcrossPartitions ( ) const
the maximum number of pending messages allowed across all the partitions

◆ getMessageRouterPtr()

const MessageRoutingPolicyPtr & pulsar::ProducerConfiguration::getMessageRouterPtr ( ) const

The getter associated with setMessageRouter().

◆ getPartitionsRoutingMode()

PartitionsRoutingMode pulsar::ProducerConfiguration::getPartitionsRoutingMode ( ) const

The getter associated with setPartitionsRoutingMode().

◆ getProducerName()

const std::string & pulsar::ProducerConfiguration::getProducerName ( ) const

The getter associated with setProducerName().

◆ getProperties()

std::map< std::string, std::string > & pulsar::ProducerConfiguration::getProperties ( ) const

Get all the properties attached to this producer.

◆ getProperty()

const std::string & pulsar::ProducerConfiguration::getProperty ( const std::string & name) const

Get the value of a specific property

namethe name of the property
the value of the property or null if the property was not defined

◆ getSchema()

const SchemaInfo & pulsar::ProducerConfiguration::getSchema ( ) const
the schema information declared for this producer

◆ getSendTimeout()

int pulsar::ProducerConfiguration::getSendTimeout ( ) const

Get the send timeout is milliseconds.

If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.

If the timeout is zero, there will be no timeout.

the send timeout in milliseconds (Default: 30000)

◆ hasProperty()

bool pulsar::ProducerConfiguration::hasProperty ( const std::string & name) const

Check whether the producer has a specific property attached.

namethe name of the property to check
true if the message has the specified property
false if the property is not defined

◆ isChunkingEnabled()

bool pulsar::ProducerConfiguration::isChunkingEnabled ( ) const

The getter associated with setChunkingEnabled().

◆ isEncryptionEnabled()

bool pulsar::ProducerConfiguration::isEncryptionEnabled ( ) const
true if encryption keys are added

◆ setAccessMode()

ProducerConfiguration & pulsar::ProducerConfiguration::setAccessMode ( const ProducerAccessMode & accessMode)

Set the type of access mode that the producer requires on the topic.

See also
accessModeThe type of access to the topic that the producer requires

◆ setBatchingEnabled()

ProducerConfiguration & pulsar::ProducerConfiguration::setBatchingEnabled ( const bool & batchingEnabled)

Control whether automatic batching of messages is enabled or not for the producer.

Default: true

When automatic batching is enabled, multiple calls to Producer::sendAsync can result in a single batch to be sent to the broker, leading to better throughput, especially when publishing small messages. If compression is enabled, messages are compressed at the batch level, leading to a much better compression ratio for similar headers or contents.

When the default batch delay is set to 10 ms and the default batch size is 1000 messages.

See also

◆ setBatchingMaxAllowedSizeInBytes()

ProducerConfiguration & pulsar::ProducerConfiguration::setBatchingMaxAllowedSizeInBytes ( const unsigned long & batchingMaxAllowedSizeInBytes)

Set the max size of messages permitted in a batch. Default value: 128 KB. If you set this option to a value greater than 1, messages are queued until this threshold is reached or batch interval has elapsed.

All messages in a batch are published as a single batch message. The consumer is delivered individual messages in the batch in the same order they are enqueued.


◆ setBatchingMaxMessages()

ProducerConfiguration & pulsar::ProducerConfiguration::setBatchingMaxMessages ( const unsigned int & batchingMaxMessages)

Set the max number of messages permitted in a batch. Default value: 1000. If you set this option to a value greater than 1, messages are queued until this threshold is reached or batch interval has elapsed.

All messages in a batch are published as a single batch message. The consumer is delivered individual messages in the batch in the same order they are enqueued.

batchMessagesMaxMessagesPerBatchmax number of messages permitted in a batch

◆ setBatchingMaxPublishDelayMs()

ProducerConfiguration & pulsar::ProducerConfiguration::setBatchingMaxPublishDelayMs ( const unsigned long & batchingMaxPublishDelayMs)

Set the max time for message publish delay permitted in a batch. Default value: 10 ms.

batchingMaxPublishDelayMsmax time for message publish delay permitted in a batch.

◆ setBatchingType()

ProducerConfiguration & pulsar::ProducerConfiguration::setBatchingType ( BatchingType batchingType)

Default: DefaultBatching

See also

◆ setBlockIfQueueFull()

ProducerConfiguration & pulsar::ProducerConfiguration::setBlockIfQueueFull ( bool )

The setter associated with getBlockIfQueueFull()

◆ setChunkingEnabled()

ProducerConfiguration & pulsar::ProducerConfiguration::setChunkingEnabled ( bool chunkingEnabled)

If message size is higher than allowed max publish-payload size by broker then enableChunking helps producer to split message into multiple chunks and publish them to broker separately in order. So, it allows client to successfully publish large size of messages in pulsar.

Set it true to enable this feature. If so, you must disable batching (see setBatchingEnabled), otherwise the producer creation will fail.

There are some other recommendations when it's enabled:

  1. This features is right now only supported for non-shared subscription and persistent-topic.
  2. It's better to reduce setMaxPendingMessages to avoid producer accupying large amount of memory by buffered messages.
  3. Set message-ttl on the namespace to cleanup chunked messages. Sometimes due to broker-restart or publish time, producer might fail to publish entire large message. So, consumer will not be able to consume and ack those messages.

Default: false

chunkingEnabledwhether chunking is enabled
the ProducerConfiguration self

◆ setCompressionType()

ProducerConfiguration & pulsar::ProducerConfiguration::setCompressionType ( CompressionType compressionType)

Set the compression type for the producer.

By default, message payloads are not compressed. Supported compression types are:

◆ setCryptoFailureAction()

ProducerConfiguration & pulsar::ProducerConfiguration::setCryptoFailureAction ( ProducerCryptoFailureAction action)

Sets the ProducerCryptoFailureAction to the value specified.

actionthe action taken by the producer in case of encryption failures.

◆ setCryptoKeyReader()

ProducerConfiguration & pulsar::ProducerConfiguration::setCryptoKeyReader ( CryptoKeyReaderPtr cryptoKeyReader)

Set the shared pointer to CryptoKeyReader.

sharedpointer to CryptoKeyReader.

◆ setHashingScheme()

ProducerConfiguration & pulsar::ProducerConfiguration::setHashingScheme ( const HashingScheme & scheme)

Set the hashing scheme, which is a standard hashing function available when choosing the partition used for a particular message.

Default: HashingScheme::BoostHash

Standard hashing functions available are:

schemehashing scheme.

◆ setInitialSequenceId()

ProducerConfiguration & pulsar::ProducerConfiguration::setInitialSequenceId ( int64_t initialSequenceId)

Set the baseline of the sequence ID for messages published by the producer.

The first message uses (initialSequenceId + 1) as its sequence ID and subsequent messages are assigned incremental sequence IDs.

Default: -1, which means the first message's sequence ID is 0.

initialSequenceIdthe initial sequence ID for the producer.

◆ setLazyStartPartitionedProducers()

ProducerConfiguration & pulsar::ProducerConfiguration::setLazyStartPartitionedProducers ( bool )

This config affects producers of partitioned topics only. It controls whether producers register and connect immediately to the owner broker of each partition or start lazily on demand. The internal producer of one partition is always started eagerly, chosen by the routing policy, but the internal producers of any additional partitions are started on demand, upon receiving their first message. Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when the SinglePartition routing policy is used without keyed messages. Because producer connection can be on demand, this can produce extra send latency for the first messages of a given partition.

true/falseas to whether to start partition producers lazily

◆ setMaxPendingMessages()

ProducerConfiguration & pulsar::ProducerConfiguration::setMaxPendingMessages ( int maxPendingMessages)

Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.

When the queue is full, by default, all calls to Producer::send and Producer::sendAsync would fail unless blockIfQueueFull is set to true. Use setBlockIfQueueFull to change the blocking behavior.

Default: 1000

maxPendingMessagesmax number of pending messages.

◆ setMaxPendingMessagesAcrossPartitions()

ProducerConfiguration & pulsar::ProducerConfiguration::setMaxPendingMessagesAcrossPartitions ( int maxPendingMessagesAcrossPartitions)

Set the number of max pending messages across all the partitions

This setting will be used to lower the max pending messages for each partition (setMaxPendingMessages(int)), if the total exceeds the configured value.

Default: 50000


◆ setMessageRouter()

ProducerConfiguration & pulsar::ProducerConfiguration::setMessageRouter ( const MessageRoutingPolicyPtr & router)

Set a custom message routing policy by passing an implementation of MessageRouter.

messageRoutermessage router.

◆ setPartitionsRoutingMode()

ProducerConfiguration & pulsar::ProducerConfiguration::setPartitionsRoutingMode ( const PartitionsRoutingMode & mode)

Set the message routing modes for partitioned topics.

Default: UseSinglePartition

PartitionsRoutingModepartition routing mode.

◆ setProducerName()

ProducerConfiguration & pulsar::ProducerConfiguration::setProducerName ( const std::string & producerName)

Set the producer name which could be assigned by the system or specified by the client.

producerNameproducer name.

◆ setProperties()

ProducerConfiguration & pulsar::ProducerConfiguration::setProperties ( const std::map< std::string, std::string > & properties)

Add all the properties in the provided map

◆ setProperty()

ProducerConfiguration & pulsar::ProducerConfiguration::setProperty ( const std::string & name,
const std::string & value )

Sets a new property on the producer

namethe name of the property
valuethe associated value

◆ setSchema()

ProducerConfiguration & pulsar::ProducerConfiguration::setSchema ( const SchemaInfo & schemaInfo)

Declare the schema of the data that will be published by this producer.

The schema will be checked against the schema of the topic, and it will fail if it's not compatible, though the client library will not perform any validation that the actual message payload are conforming to the specified schema.

For all purposes, this


◆ setSendTimeout()

ProducerConfiguration & pulsar::ProducerConfiguration::setSendTimeout ( int sendTimeoutMs)

The getter associated with getSendTimeout()

The documentation for this class was generated from the following file: