Pulsar adaptor for Apache Spark
Spark Streaming receiver
The Spark Streaming receiver for Pulsar is a custom receiver that enables Apache Spark Streaming to receive raw data from Pulsar.
An application can receive data in Resilient Distributed Dataset (RDD) format via the Spark Streaming receiver and can process it in a variety of ways.
Prerequisites
To use the receiver, include a dependency for the pulsar-spark
library in your Java configuration.
Maven
If you're using Maven, add this to your pom.xml
:
<!-- in your <properties> block -->
<pulsar.version>4.0.1</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spark</artifactId>
<version>${pulsar.version}</version>
</dependency>
Gradle
If you're using Gradle, add this to your build.gradle
file:
def pulsarVersion = "4.0.1"
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-spark', version: pulsarVersion
}
Usage
Pass an instance of SparkStreamingPulsarReceiver
to the receiverStream
method in JavaStreamingContext
:
String serviceUrl = "pulsar://localhost:6650/";
String topic = "persistent://public/default/test_src";
String subs = "test_sub";
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();
Set<String> set = new HashSet();
set.add(topic);
pulsarConf.setTopicNames(set);
pulsarConf.setSubscriptionName(subs);
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationDisabled());
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);
For a complete example, click here. In this example, the number of messages that contain the string "Pulsar" in received messages is counted.
Note that if needed, other Pulsar authentication classes can be used. For example, to use a token during authentication the following parameters for the SparkStreamingPulsarReceiver
constructor can be set:
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
serviceUrl,
pulsarConf,
new AuthenticationToken("token:<secret-JWT-token>"));