Manage schema
This guide demonstrates the ways to manage schemas:
-
Automatically
-
Manually
Schema AutoUpdate
If a schema passes the schema compatibility check, Pulsar producer automatically updates this schema to the topic it produces by default.
AutoUpdate for producer
For a producer, the AutoUpdate
happens in the following cases:
-
If a topic doesn’t have a schema, Pulsar registers a schema automatically.
-
If a topic has a schema:
-
If a producer doesn’t carry a schema:
-
If
isSchemaValidationEnforced
orschemaValidationEnforced
is disabled in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data. -
If
isSchemaValidationEnforced
orschemaValidationEnforced
is enabled in the namespace to which the topic belongs, the producer is rejected and disconnected. -
If a producer carries a schema:
A broker performs the compatibility check based on the configured compatibility check strategy of the namespace to which the topic belongs.
-
If the schema is registered, a producer is connected to a broker.
-
If the schema is not registered:
-
If
isAllowAutoUpdateSchema
sets to false, the producer is rejected to connect to a broker. -
If
isAllowAutoUpdateSchema
sets to true:-
If the schema passes the compatibility check, then the broker registers a new schema automatically for the topic and the producer is connected.
-
If the schema does not pass the compatibility check, then the broker does not register a schema and the producer is rejected to connect to a broker.
-
-
-
AutoUpdate for consumer
For a consumer, the AutoUpdate
happens in the following cases:
-
If a consumer connects to a topic without a schema (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.
-
If a consumer connects to a topic with a schema.
-
If a topic does not have all of them (a schema/data/a local consumer and a local producer):
-
If
isAllowAutoUpdateSchema
sets to true, then the consumer registers a schema and it is connected to a broker. -
If
isAllowAutoUpdateSchema
sets to false, then the consumer is rejected to connect to a broker.
-
-
If a topic has one of them (a schema/data/a local consumer and a local producer), then the schema compatibility check is performed.
-
If the schema passes the compatibility check, then the consumer is connected to the broker.
-
If the schema does not pass the compatibility check, then the consumer is rejected to connect to the broker.
-
-
Manage AutoUpdate strategy
You can use the pulsar-admin
command to manage the AutoUpdate
strategy as below:
Enable AutoUpdate
To enable AutoUpdate
on a namespace, you can use the pulsar-admin
command.
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace
Disable AutoUpdate
To disable AutoUpdate
on a namespace, you can use the pulsar-admin
command.
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace
Once the AutoUpdate
is disabled, you can only register a new schema using the pulsar-admin
command.
Adjust compatibility
To adjust the schema compatibility level on a namespace, you can use the pulsar-admin
command.
bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility <compatibility-level> tenant/namespace
Schema validation
By default, schemaValidationEnforced
is disabled for producers:
-
This means a producer without a schema can produce any kind of messages to a topic with schemas, which may result in producing trash data to the topic.
-
This allows non-java language clients that don’t support schema can produce messages to a topic with schemas.
However, if you want a stronger guarantee on the topics with schemas, you can enable schemaValidationEnforced
across the whole cluster or on a per-namespace basis.
Enable schema validation
To enable schemaValidationEnforced
on a namespace, you can use the pulsar-admin
command.
bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace
Disable schema validation
To disable schemaValidationEnforced
on a namespace, you can use the pulsar-admin
command.
bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace
Schema manual management
To manage schemas, you can use one of the following methods.
Method | Description |
---|---|
Admin CLI | You can use the pulsar-admin tool to manage Pulsar schemas, brokers, clusters, sources, sinks, topics, tenants and so on. For more information about how to use the pulsar-admin tool, see here. |
REST API | Pulsar exposes schema related management API in Pulsar’s admin RESTful API. You can access the admin RESTful endpoint directly to manage schemas. For more information about how to use the Pulsar REST API, see here. |
Java Admin API | Pulsar provides Java admin library. |
Upload a schema
To upload (register) a new schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java Admin API
Use the upload
subcommand.
$ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
The schema-definition-file
is in JSON format.
{
"type": "<schema-type>",
"schema": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
The schema-definition-file
includes the following fields:
Field | Description |
---|---|
type | The schema type. |
schema | The schema definition data, which is encoded in UTF 8 charset. |
properties | The additional properties associated with the schema. |
Here are examples of the schema-definition-file
for a JSON schema.
Example 1
{
"type": "JSON",
"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":\"string\",\"default\":null},{\"name\":\"file3\",\"type\":[\"null\",\"string\"],\"default\":\"dfdf\"}]}",
"properties": {}
}
Example 2
{
"type": "STRING",
"schema": "",
"properties": {
"key1": "value1"
}
}
Send a POST
request to this endpoint: POST /admin/v2/schemas/:tenant/:namespace/:topic/schema/uploadSchem
The post payload is in JSON format.
{
"type": "<schema-type>",
"schema": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
The post payload includes the following fields:
Field | Description |
---|---|
type | The schema type. |
schema | The schema definition data, which is encoded in UTF 8 charset. |
properties | The additional properties associated with the schema. |
void createSchema(String topic, PostSchemaPayload schemaPayload)
The PostSchemaPayload
includes the following fields:
Field | Description |
---|---|
type | The schema type. |
schema | The schema definition data, which is encoded in UTF 8 charset. |
properties | The additional properties associated with the schema. |
Here is an example of PostSchemaPayload
:
PulsarAdmin admin = …;
PostSchemaPayload payload = new PostSchemaPayload();
payload.setType("INT8");
payload.setSchema("");
admin.createSchema("my-tenant/my-ns/my-topic", payload);
Get a schema (latest)
To get the latest schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java Admin API
Use the get
subcommand.
$ pulsar-admin schemas get <topic-name>
{
"version": 0,
"type": "String",
"timestamp": 0,
"data": "string",
"properties": {
"property1": "string",
"property2": "string"
}
}
Send a GET
request to this endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/getSchem
Here is an example of a response, which is returned in JSON format.
{
"version": "<the-version-number-of-the-schema>",
"type": "<the-schema-type>",
"timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
"data": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
The response includes the following fields:
Field | Description |
---|---|
version | The schema version, which is a long number. |
type | The schema type. |
timestamp | The timestamp of creating this version of schema. |
data | The schema definition data, which is encoded in UTF 8 charset. |
properties | The additional properties associated with the schema. |
SchemaInfo createSchema(String topic)
The SchemaInfo
includes the following fields:
Field | Description |
---|---|
name | The schema name. |
type | The schema type. |
schema | A byte array of the schema definition data, which is encoded in UTF 8 charset. |
properties | The additional properties associated with the schema. |
Here is an example of SchemaInfo
:
PulsarAdmin admin = …;
SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");
Get a schema (specific)
To get a specific version of a schema, you can use one of the following methods.
- Admin CLI
- REST API
- Java Admin API
Use the get
subcommand.
$ pulsar-admin schemas get <topic-name> --version=<version>
Send a GET
request to a schema endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/:version/getSchem
Here is an example of a response, which is returned in JSON format.
{
"version": "<the-version-number-of-the-schema>",
"type": "<the-schema-type>",
"timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
"data": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
The response includes the following fields:
Field | Description |
---|---|
version | The schema version, which is a long number. |
type | The schema type. |
timestamp | The timestamp of creating this version of schema. |
data | The schema definition data, which is encoded in UTF 8 charset. |
properties | The additional properties associated with the schema. |
SchemaInfo createSchema(String topic, long version)
The SchemaInfo
includes the following fields:
Field | Description |
---|---|
name | The schema name. |
type | The schema type. |
schema | A byte array of the schema definition data, which is encoded in UTF 8. |
properties | The additional properties associated with the schema. |
Here is an example of SchemaInfo
:
PulsarAdmin admin = …;
SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
Extract a schema
To provide a schema via a topic, you can use the following method.
- Admin CLI
Use the extract
subcommand.
$ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>
Delete a schema
To delete a schema for a topic, you can use one of the following methods.
In any case, the delete action deletes all versions of a schema registered for a topic.
- Admin CLI
- REST API
- Java Admin API
Use the delete
subcommand.
$ pulsar-admin schemas delete <topic-name>
Send a DELETE
request to a schema endpoint: DELETE /admin/v2/schemas/:tenant/:namespace/:topic/schema/deleteSchema
Here is an example of a response, which is returned in JSON format.
{
"version": "<the-latest-version-number-of-the-schema>",
}
The response includes the following field:
Field | Description |
---|---|
version | The schema version, which is a long number. |
void deleteSchema(String topic)
Here is an example of deleting a schema.
PulsarAdmin admin = …;
admin.deleteSchema("my-tenant/my-ns/my-topic");
Custom schema storage
By default, Pulsar stores various data types of schemas in Apache BookKeeper deployed alongside Pulsar.
However, you can use another storage system if needed.
Implement
To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces:
SchemaStorage interface
The SchemaStorage
interface has the following methods:
public interface SchemaStorage {
// How schemas are updated
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
// How schemas are fetched from storage
CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
// How schemas are deleted
CompletableFuture<SchemaVersion> delete(String key);
// Utility method for converting a schema version byte array to a SchemaVersion object
SchemaVersion versionFromBytes(byte[] version);
// Startup behavior for the schema storage client
void start() throws Exception;
// Shutdown behavior for the schema storage client
void close() throws Exception;
}
For a complete example of schema storage implementation, see BookKeeperSchemaStorage class.
SchemaStorageFactory interface
The SchemaStorageFactory
interface has the following method:
public interface SchemaStorageFactory {
@NotNull
SchemaStorage create(PulsarService pulsar) throws Exception;
}
For a complete example of schema storage factory implementation, see BookKeeperSchemaStorageFactory class.
Deploy
To use your custom schema storage implementation, perform the following steps.
-
Package the implementation in a JAR file.
-
Add the JAR file to the
lib
folder in your Pulsar binary or source distribution. -
Change the
schemaRegistryStorageClassName
configuration inbroker.conf
to your custom factory class. -
Start Pulsar.
Set schema compatibility check strategy
You can set schema compatibility check strategy at namespace or broker level.
-
If you set schema compatibility check strategy at both namespace or broker level, it uses the strategy set for the namespace level.
-
If you do not set schema compatibility check strategy at both namespace or broker level, it uses the
FULL
strategy. -
If you set schema compatibility check strategy at broker level rather than namespace level, it uses the strategy set for the broker level.
-
If you set schema compatibility check strategy at namespace level rather than broker level, it uses the strategy set for the namespace level.
Namespace
You can set schema compatibility check strategy at namespace level using one of the following methods.
- pulsar-admin
- REST API
- Java
Use the pulsar-admin namespaces set-schema-compatibility-strategy
command.
pulsar-admin namespaces set-schema-compatibility-strategy options
Send a PUT
request to this endpoint: PUT /admin/v2/namespaces/:tenant/:namespace/schemaCompatibilityStrategy
Use the setSchemaCompatibilityStrategy
method.
admin.namespaces().setSchemaCompatibilityStrategy("test", SchemaCompatibilityStrategy.FULL);
Broker
You can set schema compatibility check strategy at broker level by setting schemaCompatibilityStrategy
in broker.conf
or standalone.conf
file.
Example
schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE