Extending Authentication and Authorization in Pulsar


Pulsar provides a way to use custom authentication and authorization mechanisms

Authentication

Pulsar support mutual TLS and Athenz authentication plugins, and these can be used as described in http://pulsar.apache.org/docs/latest/admin/Authz/.

It is possible to use a custom authentication mechanism by providing the implementation in the form of two plugins one for the Client library and the other for the Pulsar Broker to validate the credentials.

Client authentication plugin

For client library, you will need to implement org.apache.pulsar.client.api.Authentication. This class can then be passed when creating a Pulsar client:

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .authentication(new MyAuthentication())
    .build();

For reference, there are 2 interfaces to implement on the client side:

  • Authentication -> http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/Authentication.html
  • AuthenticationDataProvider -> http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/AuthenticationDataProvider.html

This in turn will need to provide the client credentials in the form of org.apache.pulsar.client.api.AuthenticationDataProvider. This will leave the chance to return different kinds of authentication token for different type of connection or by passing a certificate chain to use for TLS.

Examples for client authentication providers can be found at:

  • Mutual TLS Auth – https://github.com/apache/incubator-pulsar/tree/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth
  • Athenz – https://github.com/apache/incubator-pulsar/tree/master/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth

Broker authentication plugin

On broker side, we need the corresponding plugin to validate the credentials passed by the client. Broker can support multiple authentication providers at the same time.

In conf/broker.conf it’s possible to specify a list of valid providers:

# Autentication provider name list, which is comma separated list of class names
authenticationProviders=

There is one single interface to implement org.apache.pulsar.broker.authentication.AuthenticationProvider:

/**
 * Provider of authentication mechanism
 */
public interface AuthenticationProvider extends Closeable {

    /**
     * Perform initialization for the authentication provider
     *
     * @param config
     *            broker config object
     * @throws IOException
     *             if the initialization fails
     */
    void initialize(ServiceConfiguration config) throws IOException;

    /**
     * @return the authentication method name supported by this provider
     */
    String getAuthMethodName();

    /**
     * Validate the authentication for the given credentials with the specified authentication data
     *
     * @param authData
     *            provider specific authentication data
     * @return the "role" string for the authenticated connection, if the authentication was successful
     * @throws AuthenticationException
     *             if the credentials are not valid
     */
    String authenticate(AuthenticationDataSource authData) throws AuthenticationException;

}

Example for Broker authentication plugins:

  • Mutual TLS – https://github.com/apache/incubator-pulsar/blob/master/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java
  • Athenz – https://github.com/apache/incubator-pulsar/blob/master/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java

Authorization

Authorization is the operation that checks whether a particular “role” or “principal” is allowed to perform a certain operation.

By default, Pulsar provides an embedded authorization, though it’s possible to configure a different one through a plugin.

To provide a custom provider, one needs to implement the org.apache.pulsar.broker.authorization.AuthorizationProvider interface, have this class in the Pulsar broker classpath and configure it in conf/broker.conf:

 # Authorization provider fully qualified class-name
 authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
/**
 * Provider of authorization mechanism
 */
public interface AuthorizationProvider extends Closeable {

    /**
     * Perform initialization for the authorization provider
     *
     * @param config
     *            broker config object
     * @param configCache
     *            pulsar zk configuration cache service
     * @throws IOException
     *             if the initialization fails
     */
    void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;

    /**
     * Check if the specified role has permission to send messages to the specified fully qualified topic name.
     *
     * @param topicName
     *            the fully qualified topic name associated with the topic.
     * @param role
     *            the app id used to send messages to the topic.
     */
    CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
            AuthenticationDataSource authenticationData);

    /**
     * Check if the specified role has permission to receive messages from the specified fully qualified topic name.
     *
     * @param topicName
     *            the fully qualified topic name associated with the topic.
     * @param role
     *            the app id used to receive messages from the topic.
     * @param subscription
     *            the subscription name defined by the client
     */
    CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
            AuthenticationDataSource authenticationData, String subscription);

    /**
     * Check whether the specified role can perform a lookup for the specified topic.
     *
     * For that the caller needs to have producer or consumer permission.
     *
     * @param topicName
     * @param role
     * @return
     * @throws Exception
     */
    CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
            AuthenticationDataSource authenticationData);

    /**
     *
     * Grant authorization-action permission on a namespace to the given client
     *
     * @param namespace
     * @param actions
     * @param role
     * @param authDataJson
     *            additional authdata in json format
     * @return CompletableFuture
     * @completesWith <br/>
     *                IllegalArgumentException when namespace not found<br/>
     *                IllegalStateException when failed to grant permission
     */
    CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
            String authDataJson);

    /**
     * Grant authorization-action permission on a topic to the given client
     *
     * @param topicName
     * @param role
     * @param authDataJson
     *            additional authdata in json format
     * @return CompletableFuture
     * @completesWith <br/>
     *                IllegalArgumentException when namespace not found<br/>
     *                IllegalStateException when failed to grant permission
     */
    CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
            String authDataJson);

}