ElasticSearch sink connector
The ElasticSearch sink connector pulls messages from Pulsar topics and persists the messages to indexes.
Configuration
The configuration of the ElasticSearch sink connector has the following properties.
Property
Name | Type | Required | Default | Description |
---|---|---|---|---|
elasticSearchUrl | String | true | " " (empty string) | The URL of elastic search cluster to which the connector connects. |
indexName | String | true | " " (empty string) | The index name to which the connector writes messages. |
typeName | String | false | "_doc" | The type name to which the connector writes messages to. The value should be set explicitly to a valid type name other than "_doc" for Elasticsearch version before 6.2, and left to default otherwise. |
indexNumberOfShards | int | false | 1 | The number of shards of the index. |
indexNumberOfReplicas | int | false | 1 | The number of replicas of the index. |
username | String | false | " " (empty string) | The username used by the connector to connect to the elastic search cluster. If username is set, then password should also be provided. |
password | String | false | " " (empty string) | The password used by the connector to connect to the elastic search cluster. If username is set, then password should also be provided. |
Example
Before using the ElasticSearch sink connector, you need to create a configuration file through one of the following methods.
Configuration
For Elasticsearch After 6.2
-
JSON
{
"elasticSearchUrl": "http://localhost:9200",
"indexName": "my_index",
"username": "scooby",
"password": "doobie"
} -
YAML
configs:
elasticSearchUrl: "http://localhost:9200"
indexName: "my_index"
username: "scooby"
password: "doobie"
For Elasticsearch Before 6.2
-
JSON
{
"elasticSearchUrl": "http://localhost:9200",
"indexName": "my_index",
"typeName": "doc",
"username": "scooby",
"password": "doobie"
} -
YAML
configs:
elasticSearchUrl: "http://localhost:9200"
indexName: "my_index"
typeName: "doc"
username: "scooby"
password: "doobie"
Usage
-
Start a single node Elasticsearch cluster.
$ docker run -p 9200:9200 -p 9300:9300 \
-e "discovery.type=single-node" \
docker.elastic.co/elasticsearch/elasticsearch:7.5.1 -
Start a Pulsar service locally in standalone mode.
$ bin/pulsar standaloneMake sure the NAR file is available at
connectors/pulsar-io-elastic-search-2.8.4.nar
. -
Start the Pulsar Elasticsearch connector in local run mode using one of the following methods.
-
Use the JSON configuration as shown previously.
$ bin/pulsar-admin sinks localrun \
--archive connectors/pulsar-io-elastic-search-2.8.4.nar \
--tenant public \
--namespace default \
--name elasticsearch-test-sink \
--sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "scooby","password": "doobie"}' \
--inputs elasticsearch_test -
Use the YAML configuration file as shown previously.
$ bin/pulsar-admin sinks localrun \
--archive connectors/pulsar-io-elastic-search-2.8.4.nar \
--tenant public \
--namespace default \
--name elasticsearch-test-sink \
--sink-config-file elasticsearch-sink.yml \
--inputs elasticsearch_test
-
-
Publish records to the topic.
$ bin/pulsar-client produce elasticsearch_test --messages "{\"a\":1}" -
Check documents in Elasticsearch.
-
refresh the index
$ curl -s http://localhost:9200/my_index/_refresh -
search documents
$ curl -s http://localhost:9200/my_index/_searchYou can see the record that published earlier has been successfully written into Elasticsearch.
{"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"my_index","_type":"_doc","_id":"FSxemm8BLjG_iC0EeTYJ","_score":1.0,"_source":{"a":1}}]}}
-