Skip to main content
Version: 4.0.x

Get started

This hands-on tutorial provides instructions and examples on how to construct schemas. For instructions on administrative tasks, see Manage schema.

Construct a schema

bytes

This example demonstrates how to construct a bytes schema using language-specific clients and use it to produce and consume messages.

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("my-topic")
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();

producer.newMessage().value("message".getBytes()).send();

Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);

string

This example demonstrates how to construct a string schema using language-specific clients and use it to produce and consume messages.

Producer<String> producer = client.newProducer(Schema.STRING).create();
producer.newMessage().value("Hello Pulsar!").send();

Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
Message<String> message = consumer.receive();

key/value

This example shows how to construct a key/value schema using language-specific clients and use it to produce and consume messages.

  1. Construct a key/value schema with INLINE encoding type.

    Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
    Schema.INT32,
    Schema.STRING,
    KeyValueEncodingType.INLINE
    );

    Alternatively, construct a key/value schema with SEPARATED encoding type.

    Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
    Schema.INT32,
    Schema.STRING,
    KeyValueEncodingType.SEPARATED
    );
  2. Produce messages using a key/value schema.

    Producer<KeyValue<Integer, String>> producer = client.newProducer(kvSchema)
    .topic(topicName)
    .create();

    final int key = 100;
    final String value = "value-100";

    // send the key/value message
    producer.newMessage()
    .value(new KeyValue(key, value))
    .send();
  3. Consume messages using a key/value schema.

    Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(kvSchema)
    ...
    .topic(topicName)
    .subscriptionName(subscriptionName).subscribe();

    // receive key/value pair
    Message<KeyValue<Integer, String>> msg = consumer.receive();
    KeyValue<Integer, String> kv = msg.getValue();

Avro

Suppose you have a SensorReading class as follows, and you'd like to transmit it over a Pulsar topic.

public class SensorReading {
public float temperature;

public SensorReading(float temperature) {
this.temperature = temperature;
}

// A no-arg constructor is required
public SensorReading() {
}

public float getTemperature() {
return temperature;
}

public void setTemperature(float temperature) {
this.temperature = temperature;
}
}

Create a Producer<SensorReading> (or Consumer<SensorReading>) like this:

Producer<SensorReading> producer = client.newProducer(AvroSchema.of(SensorReading.class))
.topic("sensor-readings")
.create();

JSON

Similar to using AvroSchema, you can declare a JsonSchema by passing a class. The only difference is to use JsonSchema instead of AvroSchema when defining the schema type, as shown below. For how to use AvroSchema via record, see Method 1 - Record.

static class SchemaDemo {
public String name;
public int age;
}

Producer<SchemaDemo> producer = pulsarClient.newProducer(Schema.JSON(SchemaDemo.class))
.topic("my-topic")
.create();
Consumer<SchemaDemo> consumer = pulsarClient.newConsumer(Schema.JSON(SchemaDemo.class))
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();

SchemaDemo schemaDemo = new SchemaDemo();
schemaDemo.name = "puslar";
schemaDemo.age = 20;
producer.newMessage().value(schemaDemo).send();

Message<SchemaDemo> message = consumer.receive(5, TimeUnit.SECONDS);

ProtobufNative

The following example shows how to create a producer/consumer with a ProtobufNative schema using Java.

  1. Generate the DemoMessage class using Protobuf3 or later versions.

    syntax = "proto3";
    message DemoMessage {
    string stringField = 1;
    double doubleField = 2;
    int32 intField = 6;
    TestEnum testEnum = 4;
    SubMessage nestedField = 5;
    repeated string repeatedField = 10;
    proto.external.ExternalMessage externalMessage = 11;
    }
  2. Create a producer/consumer to send/receive messages.

    Producer<DemoMessage> producer = pulsarClient.newProducer(Schema.PROTOBUF_NATIVE(DemoMessage.class))
    .topic("my-topic")
    .create();
    Consumer<DemoMessage> consumer = pulsarClient.newConsumer(Schema.PROTOBUF_NATIVE(DemoMessage.class))
    .topic("my-topic")
    .subscriptionName("my-sub")
    .subscribe();

    SchemaDemo schemaDemo = new SchemaDemo();
    schemaDemo.name = "puslar";
    schemaDemo.age = 20;
    producer.newMessage().value(DemoMessage.newBuilder().setStringField("string-field-value")
    .setIntField(1).build()).send();

    Message<DemoMessage> message = consumer.receive(5, TimeUnit.SECONDS);

Protobuf

Constructing a protobuf schema using Java is similar to constructing a ProtobufNative schema. The only difference is to use PROTOBUF instead of PROTOBUF_NATIVE when defining schema type as shown below.

  1. Generate the DemoMessage class using Protobuf3 or later versions.

    syntax = "proto3";
    message DemoMessage {
    string stringField = 1;
    double doubleField = 2;
    int32 intField = 6;
    TestEnum testEnum = 4;
    SubMessage nestedField = 5;
    repeated string repeatedField = 10;
    proto.external.ExternalMessage externalMessage = 11;
    }
  2. Create a producer/consumer to send/receive messages.

    Producer<DemoMessage> producer = pulsarClient.newProducer(Schema.PROTOBUF(DemoMessage.class))
    .topic("my-topic")
    .create();
    Consumer<DemoMessage> consumer = pulsarClient.newConsumer(Schema.PROTOBUF(DemoMessage.class))
    .topic("my-topic")
    .subscriptionName("my-sub")
    .subscribe();

    SchemaDemo schemaDemo = new SchemaDemo();
    schemaDemo.name = "puslar";
    schemaDemo.age = 20;
    producer.newMessage().value(DemoMessage.newBuilder().setStringField("string-field-value")
    .setIntField(1).build()).send();

    Message<DemoMessage> message = consumer.receive(5, TimeUnit.SECONDS);

Native Avro

This example shows how to construct a native Avro schema.

org.apache.avro.Schema nativeAvroSchema =;
Producer<byte[]> producer = pulsarClient.newProducer().topic("ingress").create();
byte[] content =;
producer.newMessage(Schema.NATIVE_AVRO(nativeAvroSchema)).value(content).send();

AUTO_PRODUCE

Suppose you have a Pulsar topic P, a producer processing messages from a Kafka topic K, an application reading the messages from K and writing the messages to P.

This example shows how to construct an AUTO_PRODUCE schema to verify whether the bytes produced by K can be sent to P.

Produce<byte[]> pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES())

.create();
byte[] kafkaMessageBytes =;
pulsarProducer.produce(kafkaMessageBytes);

AUTO_CONSUME

Suppose you have a Pulsar topic P and a consumer MySQL that receives messages from P, and you want to check if these messages have the information that your application needs to count.

This example shows how to construct an AUTO_CONSUME schema to verify whether the bytes produced by P can be sent to MySQL.

Consumer<GenericRecord> pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME())

.subscribe();

Message<GenericRecord> msg = consumer.receive() ;
GenericRecord record = msg.getValue();
record.getFields().forEach((field -> {
if (field.getName().equals("theNeedFieldName")) {
Object recordField = record.getField(field);
//Do some things
}
}));

Customize schema storage

By default, Pulsar stores various data types of schemas in Apache BookKeeper deployed alongside Pulsar. Alternatively, you can use another storage system if needed.

To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces before deploying custom schema storage:

Implement SchemaStorage interface

The SchemaStorage interface has the following methods:

public interface SchemaStorage {
// How schemas are updated
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);

// How schemas are fetched from storage
CompletableFuture<StoredSchema> get(String key, SchemaVersion version);

// How schemas are deleted
CompletableFuture<SchemaVersion> delete(String key);

// Utility method for converting a schema version byte array to a SchemaVersion object
SchemaVersion versionFromBytes(byte[] version);

// Startup behavior for the schema storage client
void start() throws Exception;

// Shutdown behavior for the schema storage client
void close() throws Exception;
}
tip

For a complete example of schema storage implementation, see the BookKeeperSchemaStorage class.

Implement SchemaStorageFactory interface

The SchemaStorageFactory interface has the following method:

public interface SchemaStorageFactory {
@NotNull
SchemaStorage create(PulsarService pulsar) throws Exception;
}
tip

For a complete example of schema storage factory implementation, see the BookKeeperSchemaStorageFactory class.

Deploy custom schema storage

To use your custom schema storage implementation, perform the following steps.

  1. Package the implementation in a JAR file.

  2. Add the JAR file to the lib folder in your Pulsar binary or source distribution.

  3. Change the schemaRegistryStorageClassName configuration in the conf/broker.conf file to your custom factory class.

  4. Start Pulsar.