How to use Pulsar connectors
This guide describes how to use Pulsar connectors.
Install a connector​
Pulsar bundles several built-in connectors used to move data in and out of commonly used systems (such as database and messaging system). Optionally, you can create and use your desired non-built-in connectors.
When using a non-built-in connector, you need to specify the path of an archive file for the connector.
To set up a built-in connector, follow the instructions.
After the setup, the built-in connector is automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.
Configure a connector​
You can configure the following information:
Configure a default storage location for a built-in connector​
To configure a default folder for built-in connectors, set the connectorsDirectory
parameter in the ./conf/functions_worker.yml
configuration file.
Example
Set the ./connectors
folder as the default storage location for built-in connectors.
########################
# Connectors
########################
connectorsDirectory: ./connectors
Configure a connector with a YAML file​
To configure a connector, you need to provide a YAML configuration file when creating a connector.
The YAML configuration file tells Pulsar where to locate connectors and how to connect connectors with Pulsar topics.
Example 1
Below is a YAML configuration file of a Cassandra sink, which tells Pulsar:
Which Cassandra cluster to connect
What is the
keyspace
andcolumnFamily
to be used in Cassandra for collecting dataHow to map Pulsar messages into Cassandra table key and columns
tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
columnName: "col"
Example 2
Below is a YAML configuration file of a Kafka source.
configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"
Example 3
Below is a YAML configuration file of a PostgreSQL JDBC sink.
configs:
userName: "postgres"
password: "password"
jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
tableName: "test_jdbc"
Prepare a connector​
Before starting using connectors, you can perform the following operations:
reload
​
If you add or delete a nar file in a connector folder, reload the available built-in connector before using it.
Source​
To reload source connectors, you can use the reload
subcommand.
pulsar-admin sources reload
For the latest and complete information, see Pulsar admin docs.
Sink​
To reload sink connectors, you can use the reload
subcommand.
pulsar-admin sinks reload
For the latest and complete information, see Pulsar admin docs.
available
​
After reloading connectors (optional), you can get a list of available connectors.
Source​
To get a list of source connectors, you can use the available-sources
subcommand.
pulsar-admin sources available-sources
Sink​
To get a list of sink connectors, you can use the available-sinks
subcommand.
pulsar-admin sinks available-sinks
Run a connector​
To run a connector, you can perform the following operations:
create
​
To create a connector, you can use Admin CLI, REST API or JAVA admin API.
Source​
To create a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the create
subcommand.
pulsar-admin sources create options
For the latest and complete information, see Pulsar admin docs.
Send a POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/registerSource
Create a source connector with a local file.
void createSource(SourceConfig sourceConfig,
String fileName)
throws PulsarAdminExceptionParameter
Name Description sourceConfig
The source configuration object Exception
Name Description PulsarAdminException
Unexpected error For more information, see
createSource
.Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSourceWithUrl(SourceConfig sourceConfig,
String pkgUrl)
throws PulsarAdminExceptionSupported URLs are
http
andfile
.Example
File: file:///dir/fileName.jar
Parameter
Parameter Description sourceConfig
The source configuration object pkgUrl
URL from which pkg can be downloaded Exception
Name Description PulsarAdminException
Unexpected error For more information, see
createSourceWithUrl
.
Sink​
To create a sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the create
subcommand.
pulsar-admin sinks create options
For the latest and complete information, see Pulsar admin docs.
Send a POST
request to this endpoint: POST /admin/v3/sinks/:tenant/:namespace/:sinkName/registerSink
Create a sink connector with a local file.
void createSink(SinkConfig sinkConfig,
String fileName)
throws PulsarAdminExceptionParameter
Name Description sinkConfig
The sink configuration object Exception
Name Description PulsarAdminException
Unexpected error For more information, see
createSink
.Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSinkWithUrl(SinkConfig sinkConfig,
String pkgUrl)
throws PulsarAdminExceptionSupported URLs are
http
andfile
.Example
File: file:///dir/fileName.jar
Parameter
Parameter Description sinkConfig
The sink configuration object pkgUrl
URL from which pkg can be downloaded Exception
Name Description PulsarAdminException
Unexpected error For more information, see
createSinkWithUrl
.
start
​
To start a connector, you can use Admin CLI or REST API.
Source​
To start a source connector, you can use the following commands.
- Admin CLI
- REST API
Use the start
subcommand.
pulsar-admin sources start options
For the latest and complete information, see Pulsar admin docs.
Start all source connectors.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/start/startSourceStart a specified source connector.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/start/startSource
Sink​
To start a sink connector, you can use the following commands:
- Admin CLI
- REST API
Use the start
subcommand.
pulsar-admin sinks start options
For the latest and complete information, see Pulsar admin docs.
Start all sink connectors.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sinkName/start/startSinkStart a specified sink connector.
Send a
POST
request to this endpoint: POST /admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/start/startSink
localrun
​
To run a connector locally rather than deploying it on a Pulsar cluster, you can use Admin CLI
Source​
To run a source connector locally, you can use the following command:
- Admin CLI
Use the localrun
subcommand.
pulsar-admin sources localrun options
For the latest and complete information, see Pulsar admin docs.
Sink​
To run a sink connector locally, you can use the following command:
- Admin CLI
Use the localrun
subcommand.
pulsar-admin sinks localrun options
For the latest and complete information, see Pulsar admin docs.
Run a Pulsar Function before a sink connector​
You can run a Pulsar Function in memory before a sink connector. For details, see PIP 193: Sink preprocessing Function.
Running a Pulsar Function in memory before a sink connector provides lower latency, less I/O, and disk consumption than going through an intermediate topic.
Use the --transform-function
, --transform-function-classname
and --transform-function-config
options when creating the sink connector to configure the transform Function to run.
For the latest and complete information, see Pulsar admin sinks command docs.
Monitor a connector​
To monitor a connector, you can perform the following operations:
get
​
To get the information of a connector, You can use Admin CLI, REST API or JAVA admin API.
Source​
To get the information of a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the get
subcommand.
pulsar-admin sources get options
For the latest and complete information, see Pulsar admin docs.
Send a GET
request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/:sourceName/getSourceInfo
SourceConfig getSource(String tenant,
String namespace,
String source)
throws PulsarAdminException
Example
This is a sourceConfig.
{
"tenant": "tenantName",
"namespace": "namespaceName",
"name": "sourceName",
"className": "className",
"topicName": "topicName",
"configs": {},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"resources": {
"cpu": 1.0,
"ram": 1073741824,
"disk": 10737418240
}
}
This is a sourceConfig example.
{
"tenant": "public",
"namespace": "default",
"name": "debezium-mysql-source",
"className": "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource",
"topicName": "debezium-mysql-topic",
"configs": {
"database.user": "debezium",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.port": "3306",
"database.hostname": "localhost",
"database.password": "dbz",
"database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.whitelist": "inventory",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
"pulsar.service.url": "pulsar://127.0.0.1:6650",
"database.history.pulsar.topic": "history-topic2"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"resources": {
"cpu": 1.0,
"ram": 1073741824,
"disk": 10737418240
}
}
Exception
Exception name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException | Unexpected error |
For more information, see getSource
.
Sink​
To get the information of a sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the get
subcommand.
pulsar-admin sinks get options
For the latest and complete information, see Pulsar admin docs.
Send a GET
request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/:sinkName/getSinkInfo
SinkConfig getSink(String tenant,
String namespace,
String sink)
throws PulsarAdminException
Example
This is a sinkConfig.
{
"tenant": "tenantName",
"namespace": "namespaceName",
"name": "sinkName",
"className": "className",
"inputSpecs": {
"topicName": {
"isRegexPattern": false
}
},
"configs": {},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true
}
This is a sinkConfig example.
{
"tenant": "public",
"namespace": "default",
"name": "pulsar-postgres-jdbc-sink",
"className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink",
"inputSpecs": {
"pulsar-postgres-jdbc-sink-topic": {
"isRegexPattern": false
}
},
"configs": {
"password": "password",
"jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink",
"userName": "postgres",
"tableName": "pulsar_postgres_jdbc_sink"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true
}
Parameter description
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
sink | Sink name |
For more information, see getSink
.
list
​
To get the list of all running connectors, You can use Admin CLI, REST API or JAVA admin API.
Source​
To get the list of all running source connectors, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the list
subcommand.
pulsar-admin sources list options
For the latest and complete information, see Pulsar admin docs.
Send a GET
request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/listSources
List<String> listSources(String tenant,
String namespace)
throws PulsarAdminException
Response example
["f1", "f2", "f3"]
Exception
Exception name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException | Unexpected error |
For more information, see listSource
.
Sink​
To get the list of all running sink connectors, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the list
subcommand.
pulsar-admin sinks list options
For the latest and complete information, see Pulsar admin docs.
Send a GET
request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/listSinks
List<String> listSinks(String tenant,
String namespace)
throws PulsarAdminException
Response example
["f1", "f2", "f3"]
Exception
Exception name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException | Unexpected error |
For more information, see listSource
.
status
​
To get the current status of a connector, you can use Admin CLI, REST API or JAVA admin API.
Source​
To get the current status of a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the status
subcommand.
pulsar-admin sources status options
For the latest and complete information, see Pulsar admin docs.
Get the current status of all source connectors.
Send a
GET
request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/:sourceName/status/getSourceStatusGets the current status of a specified source connector.
Send a
GET
request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/status/getSourceStatus
Get the current status of all source connectors.
SourceStatus getSourceStatus(String tenant,
String namespace,
String source)
throws PulsarAdminExceptionParameter
Parameter Description tenant
Tenant name namespace
Namespace name sink
Source name Exception
Name Description PulsarAdminException
Unexpected error For more information, see
getSourceStatus
.Gets the current status of a specified source connector.
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant,
String namespace,
String source,
int id)
throws PulsarAdminExceptionParameter
Parameter Description tenant
Tenant name namespace
Namespace name sink
Source name id
Source instanceID Exception
Exception name Description PulsarAdminException
Unexpected error For more information, see
getSourceStatus
.
Sink​
To get the current status of a Pulsar sink connector, you can use the following commads:
- Admin CLI
- REST API
- Java Admin API
Use the status
subcommand.
pulsar-admin sinks status options
For the latest and complete information, see Pulsar admin docs.
Get the current status of all sink connectors.
Send a
GET
request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/:sinkName/status/getSinkStatusGets the current status of a specified sink connector.
Send a
GET
request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/status/getSinkInstanceStatus
Get the current status of all sink connectors.
SinkStatus getSinkStatus(String tenant,
String namespace,
String sink)
throws PulsarAdminExceptionParameter
Parameter Description tenant
Tenant name namespace
Namespace name sink
Source name Exception
Exception name Description PulsarAdminException
Unexpected error For more information, see
getSinkStatus
.Gets the current status of a specified source connector.
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant,
String namespace,
String sink,
int id)
throws PulsarAdminExceptionParameter
Parameter Description tenant
Tenant name namespace
Namespace name sink
Source name id
Sink instanceID Exception
Exception name Description PulsarAdminException
Unexpected error For more information, see
getSinkStatusWithInstanceID
.
Update a connector​
update
​
To update a running connector, you can use Admin CLI, REST API or JAVA admin API.
Source​
To update a running Pulsar source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the update
subcommand.
pulsar-admin sources update options
For the latest and complete information, see Pulsar admin docs.
Send a PUT
request to this endpoint: PUT /admin/v3/sources/:tenant/:namespace/:sourceName/updateSource
Update a running source connector with a local file.
void updateSource(SourceConfig sourceConfig,
String fileName)
throws PulsarAdminExceptionParameter
Name Description sourceConfig
The source configuration object Exception
Name Description PulsarAdminException.NotAuthorizedException
You don't have the admin permission PulsarAdminException.NotFoundException
Cluster doesn't exist PulsarAdminException
Unexpected error For more information, see
updateSource
.Update a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void updateSourceWithUrl(SourceConfig sourceConfig,
String pkgUrl)
throws PulsarAdminExceptionSupported URLs are
http
andfile
.Example
File: file:///dir/fileName.jar
Parameter
Name Description sourceConfig
The source configuration object pkgUrl
URL from which pkg can be downloaded Exception
Name Description PulsarAdminException.NotAuthorizedException
You don't have the admin permission PulsarAdminException.NotFoundException
Cluster doesn't exist PulsarAdminException
Unexpected error
For more information, see createSourceWithUrl
.
Sink​
To update a running Pulsar sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the update
subcommand.
pulsar-admin sinks update options
For the latest and complete information, see Pulsar admin docs.
Send a PUT
request to this endpoint: PUT /admin/v3/sinks/:tenant/:namespace/:sinkName/updateSink
Update a running sink connector with a local file.
void updateSink(SinkConfig sinkConfig,
String fileName)
throws PulsarAdminExceptionParameter
Name Description sinkConfig
The sink configuration object Exception
Name Description PulsarAdminException.NotAuthorizedException
You don't have the admin permission PulsarAdminException.NotFoundException
Cluster doesn't exist PulsarAdminException
Unexpected error For more information, see
updateSink
.Update a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void updateSinkWithUrl(SinkConfig sinkConfig,
String pkgUrl)
throws PulsarAdminExceptionSupported URLs are
http
andfile
.Example
File: file:///dir/fileName.jar
Parameter
Name Description sinkConfig
The sink configuration object pkgUrl
URL from which pkg can be downloaded Exception
Name Description PulsarAdminException.NotAuthorizedException
You don't have the admin permission PulsarAdminException.NotFoundException
Cluster doesn't exist PulsarAdminException
Unexpected error
For more information, see updateSinkWithUrl
.
Stop a connector​
stop
​
To stop a connector, you can use Admin CLI, REST API or JAVA admin API.
Source​
To stop a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the stop
subcommand.
pulsar-admin sources stop options
For the latest and complete information, see Pulsar admin docs.
Stop all source connectors.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/stopSourceStop a specified source connector.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/stopSource
Stop all source connectors.
void stopSource(String tenant,
String namespace,
String source)
throws PulsarAdminExceptionParameter
Name Description tenant
Tenant name namespace
Namespace name source
Source name Exception
Name Description PulsarAdminException
Unexpected error For more information, see
stopSource
.Stop a specified source connector.
void stopSource(String tenant,
String namespace,
String source,
int instanceId)
throws PulsarAdminExceptionParameter
Name Description tenant
Tenant name namespace
Namespace name source
Source name instanceId
Source instanceID Exception
Name Description PulsarAdminException
Unexpected error For more information, see
stopSource
.
Sink​
To stop a sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the stop
subcommand.
pulsar-admin sinks stop options
For the latest and complete information, see Pulsar admin docs.
Stop all sink connectors.
Send a
POST
request to this endpoint: POST /admin/v3/sinks/:tenant/:namespace/:sinkName/stop/stopSinkStop a specified sink connector.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sinkeName/:instanceId/stop/stopSink
Stop all sink connectors.
void stopSink(String tenant,
String namespace,
String sink)
throws PulsarAdminExceptionParameter
Name Description tenant
Tenant name namespace
Namespace name source
Source name Exception
Name Description PulsarAdminException
Unexpected error For more information, see
stopSink
.Stop a specified sink connector.
void stopSink(String tenant,
String namespace,
String sink,
int instanceId)
throws PulsarAdminExceptionParameter
Name Description tenant
Tenant name namespace
Namespace name source
Source name instanceId
Source instanceID Exception
Name Description PulsarAdminException
Unexpected error For more information, see
stopSink
.
Restart a connector​
restart
​
To restart a connector, you can use Admin CLI, REST API or JAVA admin API.
Source​
To restart a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the restart
subcommand.
pulsar-admin sources restart options
For the latest and complete information, see Pulsar admin docs.
Restart all source connectors.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/restart/restartSourceRestart a specified source connector.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/restart/restartSource
Restart all source connectors.
void restartSource(String tenant,
String namespace,
String source)
throws PulsarAdminExceptionParameter
Name Description tenant
Tenant name namespace
Namespace name source
Source name Exception
Name Description PulsarAdminException
Unexpected error For more information, see
restartSource
.Restart a specified source connector.
void restartSource(String tenant,
String namespace,
String source,
int instanceId)
throws PulsarAdminExceptionParameter
Name Description tenant
Tenant name namespace
Namespace name source
Source name instanceId
Source instanceID Exception
Name Description PulsarAdminException
Unexpected error For more information, see
restartSource
.
Sink​
To restart a sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the restart
subcommand.
pulsar-admin sinks restart options
For the latest and complete information, see Pulsar admin docs.
Restart all sink connectors.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sinkName/restart/restartSourceRestart a specified sink connector.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sinkName/:instanceId/restart/restartSource
Restart all Pulsar sink connectors.
void restartSink(String tenant,
String namespace,
String sink)
throws PulsarAdminExceptionParameter
Name Description tenant
Tenant name namespace
Namespace name sink
Sink name Exception
Name Description PulsarAdminException
Unexpected error For more information, see
restartSink
.Restart a specified sink connector.
void restartSink(String tenant,
String namespace,
String sink,
int instanceId)
throws PulsarAdminExceptionParameter
Name Description tenant
Tenant name namespace
Namespace name source
Source name instanceId
Sink instanceID Exception
Name Description PulsarAdminException
Unexpected error For more information, see
restartSink
.
Delete a connector​
delete
​
To delete a connector, you can use Admin CLI, REST API or JAVA admin API.
Source​
To delete a source connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the delete
subcommand.
pulsar-admin sources delete options
For the latest and complete information, see Pulsar admin docs.
Delete al Pulsar source connector.
Send a DELETE
request to this endpoint: DELETE /admin/v3/sources/:tenant/:namespace/:sourceName/deregisterSource
Delete a source connector.
void deleteSource(String tenant,
String namespace,
String source)
throws PulsarAdminException
Parameter
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
source | Source name |
Exception
Name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException.PreconditionFailedException | Cluster is not empty |
PulsarAdminException | Unexpected error |
For more information, see deleteSource
.
Sink​
To delete a sink connector, you can use the following commands:
- Admin CLI
- REST API
- Java Admin API
Use the delete
subcommand.
pulsar-admin sinks delete options
For the latest and complete information, see Pulsar admin docs.
Delete a sink connector.
Send a DELETE
request to this endpoint: DELETE /admin/v3/sinks/:tenant/:namespace/:sinkName/deregisterSink
Delete a Pulsar sink connector.
void deleteSink(String tenant,
String namespace,
String source)
throws PulsarAdminException
Parameter
Name | Description |
---|---|
tenant | Tenant name |
namespace | Namespace name |
sink | Sink name |
Exception
Name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException.PreconditionFailedException | Cluster is not empty |
PulsarAdminException | Unexpected error |
For more information, see deleteSource
.