Skip to main content
Version: 3.3.x

How to use Pulsar connectors

This guide describes how to use Pulsar connectors.

Install a connector​

Pulsar bundles several built-in connectors used to move data in and out of commonly used systems (such as database and messaging system). Optionally, you can create and use your desired non-built-in connectors.

note

When using a non-built-in connector, you need to specify the path of an archive file for the connector.

To set up a built-in connector, follow the instructions.

After the setup, the built-in connector is automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.

Configure a connector​

You can configure the following information:

Configure a default storage location for a built-in connector​

To configure a default folder for built-in connectors, set the connectorsDirectory parameter in the ./conf/functions_worker.yml configuration file.

Example

Set the ./connectors folder as the default storage location for built-in connectors.

########################
# Connectors
########################

connectorsDirectory: ./connectors

Configure a connector with a YAML file​

To configure a connector, you need to provide a YAML configuration file when creating a connector.

The YAML configuration file tells Pulsar where to locate connectors and how to connect connectors with Pulsar topics.

Example 1

Below is a YAML configuration file of a Cassandra sink, which tells Pulsar:

  • Which Cassandra cluster to connect

  • What is the keyspace and columnFamily to be used in Cassandra for collecting data

  • How to map Pulsar messages into Cassandra table key and columns

tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
columnName: "col"

Example 2

Below is a YAML configuration file of a Kafka source.

configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"

Example 3

Below is a YAML configuration file of a PostgreSQL JDBC sink.

configs:
userName: "postgres"
password: "password"
jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
tableName: "test_jdbc"

Prepare a connector​

Before starting using connectors, you can perform the following operations:

reload​

If you add or delete a nar file in a connector folder, reload the available built-in connector before using it.

Source​

To reload source connectors, you can use the reload subcommand.

pulsar-admin sources reload

For the latest and complete information, see Pulsar admin docs.

Sink​

To reload sink connectors, you can use the reload subcommand.

pulsar-admin sinks reload

For the latest and complete information, see Pulsar admin docs.

available​

After reloading connectors (optional), you can get a list of available connectors.

Source​

To get a list of source connectors, you can use the available-sources subcommand.

pulsar-admin sources available-sources

Sink​

To get a list of sink connectors, you can use the available-sinks subcommand.

pulsar-admin sinks available-sinks

Run a connector​

To run a connector, you can perform the following operations:

create​

To create a connector, you can use Admin CLI, REST API or JAVA admin API.

Source​

To create a source connector, you can use the following commands:

Use the create subcommand.

pulsar-admin sources create options

For the latest and complete information, see Pulsar admin docs.

Sink​

To create a sink connector, you can use the following commands:

Use the create subcommand.

pulsar-admin sinks create options

For the latest and complete information, see Pulsar admin docs.

start​

To start a connector, you can use Admin CLI or REST API.

Source​

To start a source connector, you can use the following commands.

Use the start subcommand.

pulsar-admin sources start options

For the latest and complete information, see Pulsar admin docs.

Sink​

To start a sink connector, you can use the following commands:

Use the start subcommand.

pulsar-admin sinks start options

For the latest and complete information, see Pulsar admin docs.

localrun​

To run a connector locally rather than deploying it on a Pulsar cluster, you can use Admin CLI

Source​

To run a source connector locally, you can use the following command:

Use the localrun subcommand.

pulsar-admin sources localrun options

For the latest and complete information, see Pulsar admin docs.

Sink​

To run a sink connector locally, you can use the following command:

Use the localrun subcommand.

pulsar-admin sinks localrun options

For the latest and complete information, see Pulsar admin docs.

Run a Pulsar Function before a sink connector​

You can run a Pulsar Function in memory before a sink connector. For details, see PIP 193: Sink preprocessing Function. Running a Pulsar Function in memory before a sink connector provides lower latency, less I/O, and disk consumption than going through an intermediate topic. Use the --transform-function, --transform-function-classname and --transform-function-config options when creating the sink connector to configure the transform Function to run.

For the latest and complete information, see Pulsar admin sinks command docs.

Monitor a connector​

To monitor a connector, you can perform the following operations:

get​

To get the information of a connector, You can use Admin CLI, REST API or JAVA admin API.

Source​

To get the information of a source connector, you can use the following commands:

Use the get subcommand.

pulsar-admin sources get options

For the latest and complete information, see Pulsar admin docs.

Sink​

To get the information of a sink connector, you can use the following commands:

Use the get subcommand.

pulsar-admin sinks get options

For the latest and complete information, see Pulsar admin docs.

list​

To get the list of all running connectors, You can use Admin CLI, REST API or JAVA admin API.

Source​

To get the list of all running source connectors, you can use the following commands:

Use the list subcommand.

pulsar-admin sources list options

For the latest and complete information, see Pulsar admin docs.

Sink​

To get the list of all running sink connectors, you can use the following commands:

Use the list subcommand.

pulsar-admin sinks list options

For the latest and complete information, see Pulsar admin docs.

status​

To get the current status of a connector, you can use Admin CLI, REST API or JAVA admin API.

Source​

To get the current status of a source connector, you can use the following commands:

Use the status subcommand.

pulsar-admin sources status options

For the latest and complete information, see Pulsar admin docs.

Sink​

To get the current status of a Pulsar sink connector, you can use the following commads:

Use the status subcommand.

pulsar-admin sinks status options

For the latest and complete information, see Pulsar admin docs.

Update a connector​

update​

To update a running connector, you can use Admin CLI, REST API or JAVA admin API.

Source​

To update a running Pulsar source connector, you can use the following commands:

Use the update subcommand.

pulsar-admin sources update options

For the latest and complete information, see Pulsar admin docs.

Sink​

To update a running Pulsar sink connector, you can use the following commands:

Use the update subcommand.

pulsar-admin sinks update options

For the latest and complete information, see Pulsar admin docs.

Stop a connector​

stop​

To stop a connector, you can use Admin CLI, REST API or JAVA admin API.

Source​

To stop a source connector, you can use the following commands:

Use the stop subcommand.

pulsar-admin sources stop options

For the latest and complete information, see Pulsar admin docs.

Sink​

To stop a sink connector, you can use the following commands:

Use the stop subcommand.

pulsar-admin sinks stop options

For the latest and complete information, see Pulsar admin docs.

Restart a connector​

restart​

To restart a connector, you can use Admin CLI, REST API or JAVA admin API.

Source​

To restart a source connector, you can use the following commands:

Use the restart subcommand.

pulsar-admin sources restart options

For the latest and complete information, see Pulsar admin docs.

Sink​

To restart a sink connector, you can use the following commands:

Use the restart subcommand.

pulsar-admin sinks restart options

For the latest and complete information, see Pulsar admin docs.

Delete a connector​

delete​

To delete a connector, you can use Admin CLI, REST API or JAVA admin API.

Source​

To delete a source connector, you can use the following commands:

Use the delete subcommand.

pulsar-admin sources delete options

For the latest and complete information, see Pulsar admin docs.

Sink​

To delete a sink connector, you can use the following commands:

Use the delete subcommand.

pulsar-admin sinks delete options

For the latest and complete information, see Pulsar admin docs.