Skip to main content
Version: Next

Kinesis sink connector

note

You can download all the Pulsar connectors on download page.

The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis.

Configuration​

The configuration of the Kinesis sink connector has the following property.

Property​

NameTypeRequiredDefaultDescription
messageFormatMessageFormattrueONLY_RAW_PAYLOADMessage format in which Kinesis sink converts Pulsar messages and publishes to Kinesis streams.

Below are the available options:

  • ONLY_RAW_PAYLOAD: Kinesis sink directly publishes Pulsar message payload as a message into the configured Kinesis stream.

  • FULL_MESSAGE_IN_JSON: Kinesis sink creates a JSON payload with Pulsar message payload, properties and encryptionCtx, and publishes JSON payload into the configured Kinesis stream.

  • FULL_MESSAGE_IN_FB: Kinesis sink creates a flatbuffer serialized payload with Pulsar message payload, properties and encryptionCtx, and publishes flatbuffer payload into the configured Kinesis stream.

  • FULL_MESSAGE_IN_JSON_EXPAND_VALUE: Kinesis sink sends a JSON structure containing the record topic name, key, payload, properties and event time. The record schema is used to convert the value to JSON.
  • jsonIncludeNonNullsbooleanfalsetrueOnly the properties with non-null values are included when the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE.
    jsonFlattenbooleanfalsefalseWhen it is set to true and the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE, the output JSON is flattened.
    retainOrderingbooleanfalsefalseWhether Pulsar connectors to retain ordering when moving messages from Pulsar to Kinesis or not.
    awsEndpointStringfalse" " (empty string)The Kinesis end-point URL, which can be found at here.
    awsRegionStringfalse" " (empty string)The AWS region.

    Example
    us-west-1, us-west-2
    awsKinesisStreamNameStringtrue" " (empty string)The Kinesis stream name.
    awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of implementation of AwsCredentialProviderPlugin.

    It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink.

    If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in awsCredentialPluginParam.
    awsCredentialPluginParamStringfalse" " (empty string)The JSON parameter to initialize awsCredentialsProviderPlugin.

    Built-in plugins​

    The following are built-in AwsCredentialProviderPlugin plugins:

    • org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin

      This plugin takes no configuration, it uses the default AWS provider chain.

      For more information, see AWS documentation.

    • org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin

      This plugin takes a configuration (via the awsCredentialPluginParam) that describes a role to assume when running the KCL.

      This configuration takes the form of a small JSON document like:

      {"roleArn": "arn...", "roleSessionName": "name"}

    Example​

    Before using the Kinesis sink connector, you need to create a configuration file through one of the following methods.

    • JSON

      {
      "configs": {
      "awsEndpoint": "some.endpoint.aws",
      "awsRegion": "us-east-1",
      "awsKinesisStreamName": "my-stream",
      "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
      "messageFormat": "ONLY_RAW_PAYLOAD",
      "retainOrdering": "true"
      }
      }
    • YAML

      configs:
      awsEndpoint: "some.endpoint.aws"
      awsRegion: "us-east-1"
      awsKinesisStreamName: "my-stream"
      awsCredentialPluginParam: "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"
      messageFormat: "ONLY_RAW_PAYLOAD"
      retainOrdering: "true"