How to develop Pulsar connectors
This guide describes how to develop Pulsar connectors to move data between Pulsar and other systems.
Pulsar connectors are special Pulsar Functions, so creating a Pulsar connector is similar to creating a Pulsar function.
Pulsar connectors come in two types:
Type | Description | Example |
---|---|---|
Source | Import data from another system to Pulsar. | RabbitMQ source connector imports the messages of a RabbitMQ queue to a Pulsar topic. |
Sink | Export data from Pulsar to another system. | Kinesis sink connector exports the messages of a Pulsar topic to a Kinesis stream. |
Developβ
You can develop Pulsar source connectors and sink connectors.
Sourceβ
Developing a source connector is to implement the Source interface, which means you need to implement the open method and the read method.
-
Implement the open method.
/**
* Open connector with configuration
*
* @param config initialization config
* @param sourceContext
* @throws Exception IO type exceptions when opening a connector
*/
void open(final Map<String, Object> config, SourceContext sourceContext) throws Exception;This method is called when the source connector is initialized.
In this method, you can retrieve all connector specific settings through the passed-in
config
parameter and initialize all necessary resources.For example, a Kafka connector can create a Kafka client in this
open
method.Besides, Pulsar runtime also provides a
SourceContext
for the connector to access runtime resources for tasks like collecting metrics. The implementation can save theSourceContext
for future use. -
Implement the read method.
/**
* Reads the next message from source.
* If source does not have any new messages, this call should block.
* @return next message from source. The return result should never be null
* @throws Exception
*/
Record<T> read() throws Exception;If nothing to return, the implementation should be blocking rather than returning
null
.The returned Record should encapsulate the following information, which is needed by Pulsar IO runtime.
-
Record should provide the following variables:
Variable Required Description TopicName
No Pulsar topic name from which the record is originated from. Key
No Messages can optionally be tagged with keys.
For more information, see Routing modes.Value
Yes Actual data of the record. EventTime
No Event time of the record from the source. PartitionId
No If the record is originated from a partitioned source, it returns its PartitionId
.PartitionId
is used as a part of the unique identifier by Pulsar IO runtime to deduplicate messages and achieve exactly-once processing guarantee.RecordSequence
No If the record is originated from a sequential source, it returns its RecordSequence
.RecordSequence
is used as a part of the unique identifier by Pulsar IO runtime to deduplicate messages and achieve exactly-once processing guarantee.Properties
No If the record carries user-defined properties, it returns those properties. DestinationTopic
No Topic to which message should be written. Message
No A class which carries data sent by users.
For more information, see Message.java. -
Record should provide the following methods:
Method Description ack
Acknowledge that the record is fully processed. fail
Indicate that the record fails to be processed.
-