Skip to main content

Kinesis sink connector

The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis.

Configuration

The configuration of the Kinesis sink connector has the following property.

Property

NameTypeRequiredDefaultDescription
messageFormatMessageFormattrueONLY_RAW_PAYLOADMessage format in which Kinesis sink converts Pulsar messages and publishes to Kinesis streams.

Below are the available options:

  • ONLY_RAW_PAYLOAD: Kinesis sink directly publishes Pulsar message payload as a message into the configured Kinesis stream.

  • FULL_MESSAGE_IN_JSON: Kinesis sink creates a JSON payload with Pulsar message payload, properties and encryptionCtx, and publishes JSON payload into the configured Kinesis stream.

  • FULL_MESSAGE_IN_FB: Kinesis sink creates a flatbuffer serialized payload with Pulsar message payload, properties and encryptionCtx, and publishes flatbuffer payload into the configured Kinesis stream.
  • retainOrderingbooleanfalsefalseWhether Pulsar connectors to retain ordering when moving messages from Pulsar to Kinesis or not.
    awsEndpointStringfalse" " (empty string)The Kinesis end-point URL, which can be found at here.
    awsRegionStringfalse" " (empty string)The AWS region.

    Example
    us-west-1, us-west-2
    awsKinesisStreamNameStringtrue" " (empty string)The Kinesis stream name.
    awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of implementation of AwsCredentialProviderPlugin.

    It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink.

    If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in awsCredentialPluginParam.
    awsCredentialPluginParamStringfalse" " (empty string)The JSON parameter to initialize awsCredentialsProviderPlugin.

    Built-in plugins

    The following are built-in AwsCredentialProviderPlugin plugins:

    • org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin

      This plugin takes no configuration, it uses the default AWS provider chain.

      For more information, see AWS documentation.

    • org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin

      This plugin takes a configuration (via the awsCredentialPluginParam) that describes a role to assume when running the KCL.

      This configuration takes the form of a small json document like:


      {"roleArn": "arn...", "roleSessionName": "name"}

    Example

    Before using the Kinesis sink connector, you need to create a configuration file through one of the following methods.

    • JSON


      {
      "awsEndpoint": "some.endpoint.aws",
      "awsRegion": "us-east-1",
      "awsKinesisStreamName": "my-stream",
      "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
      "messageFormat": "ONLY_RAW_PAYLOAD",
      "retainOrdering": "true"
      }

    • YAML


      configs:
      awsEndpoint: "some.endpoint.aws"
      awsRegion: "us-east-1"
      awsKinesisStreamName: "my-stream"
      awsCredentialPluginParam: "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"
      messageFormat: "ONLY_RAW_PAYLOAD"
      retainOrdering: "true"