Configure Kubernetes runtime
The Kubernetes runtime works when a function worker generates and applies Kubernetes manifests. The manifests generated by a function worker include:
- a StatefulSetBy default, theStatefulSetmanifest has a single pod with a number of replicas. The number is determined by the parallelism of the function. The pod downloads the function payload (via the function worker REST API) on pod boot. The pod's container image is configurable if the function runtime is configured.
- a Service(used to communicate with the pod)
- a Secretfor authenticating credentials (when applicable). The Kubernetes runtime supports secrets. You can create a Kubernetes secret and expose it as an environment variable in the pod.
For the rules of translating Pulsar object names into Kubernetes resource labels, see instructions.
Configure basic settings
To quickly configure a Kubernetes runtime, you can use the default settings of KubernetesRuntimeFactoryConfig in the conf/functions_worker.yml file.
If you have [set up a Pulsar cluster on Kubernetes using Helm chart, which means function workers have also been set up on Kubernetes, you can use the serviceAccount associated with the pod where the function worker is running. Otherwise, you can configure function workers to communicate with a Kubernetes cluster by setting functionRuntimeFactoryConfigs to k8Uri.
Integrate Kubernetes secrets
A Secret in Kubernetes is an object that holds some confidential data such as a password, a token, or a key. When you create a secret in the Kubernetes namespace where your functions are deployed, functions can safely reference and distribute it. To enable this feature, set secretsProviderConfiguratorClassName to org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator in the conf/functions-worker.yml file.
For example, you deploy a function to the pulsar-func Kubernetes namespace, and you have a secret named database-creds with a field name password, which you want to mount in the pod as an environment variable named DATABASE_PASSWORD. The following configurations enable functions to reference the secret and mount the value as an environment variable in the pod.
tenant: "mytenant"
namespace: "mynamespace"
name: "myfunction"
inputs: [ "persistent://mytenant/mynamespace/myfuncinput" ]
className: "com.company.pulsar.myfunction"
secrets:
  # the secret will be mounted from the `password` field in the `database-creds` secret as an env var called `DATABASE_PASSWORD`
  DATABASE_PASSWORD:
    path: "database-creds"
    key: "password"
Enable token authentication
When you use token authentication, TLS encryption, or custom authentications to secure the communication with your Pulsar cluster, Pulsar passes your certificate authority (CA) to the client, so the client can authenticate the cluster with your signed certificate.
To enable the authentication for your Pulsar cluster, you need to specify a mechanism for the pod running your function to authenticate the broker, by implementing the org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider interface.
- 
For token authentication, Pulsar includes an implementation of the above interface to distribute the CA. The function worker captures the token that deploys (or updates) the function, saves it as a secret, and mounts it into the pod. The configuration in the conf/function-worker.ymlfile is as follows.functionAuthProviderClassNameis used to specify the path to this implementation.functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider
- 
For TLS or custom authentication, you can either implement the org.apache.pulsar.functions.auth.KubernetesFunctionAuthProviderinterface or use an alternative mechanism.
If the token you use to deploy the function has an expiration date, you may need to deploy the function again after it expires.
Enable Kubernetes service account token projection for function pod authentication
The KubernetesServiceAccountTokenAuthProvider uses service account token volume projections to mount a token into the function's pod. The function worker and broker can verify this token using OpenID Connect. The primary benefit of this integration is that tokens have a short time to live, are managed by Kubernetes, and do not inherit the permission used to create the function.
This feature requires that the broker and the function worker are configured to use the AuthenticationProviderOpenID. Documentation to enable this provider can be found here.
Here is an example configuration for the function worker to utilize this feature:
functionAuthProviderClassName: "org.apache.pulsar.functions.auth.KubernetesServiceAccountTokenAuthProvider"
kubernetesContainerFactory:
  kubernetesFunctionAuthProviderConfig:
    # Required
    serviceAccountTokenExpirationSeconds: "600"
    serviceAccountTokenAudience: "the-required-audience"
    # Optional
    brokerClientTrustCertsSecretName: "my-secret-pulsar-broker-client-trust-certs"
The function pod deploys with the default Kubernetes service account for the target namespace. Because the service account name maps to the sub claim on the JWT projected into the pod's filesystem, all pods with the same service account will have the same permission within Pulsar. There is ongoing work to improve this integration.
Here is a sample JWT generated by this feature running in EKS (with some information redacted):
{
  "aud": [
    "your-audience"
  ],
  "exp": 1710969822,
  "iat": 1679433822,
  "iss": "https://oidc.eks.us-east-2.amazonaws.com/id/some-id",
  "kubernetes.io": {
    "namespace": "pulsar-function",
    "pod": {
      "name": "function-pod-0",
      "uid": "fbac8f9e-a47d-4ad7-a8f0-cc9a65d1331c"
    },
    "serviceaccount": {
      "name": "default",
      "uid": "5964f9d3-3dce-467c-8dbe-d0f463063d7a"
    },
    "warnafter": 1679437429
  },
  "nbf": 1679433822,
  "sub": "system:serviceaccount:pulsar-function:default"
}
To grant permission to this function pod, you need to grant permissions to the role claim, which is the sub claim by default, system:serviceaccount:pulsar-function:default.
Customize Kubernetes runtime
Customizing Kubernetes runtime allows you to customize Kubernetes resources created by the runtime, including how to generate manifests, how to pass authenticated data to pods, and how to integrate secrets.
To customize Kubernetes runtime, you can set runtimeCustomizerClassName in the conf/functions-worker.yml file and use the fully qualified class name.
The function API provides a flag named customRuntimeOptions, which is passed to the org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer interface. To initialize KubernetesManifestCustomizer, you can set runtimeCustomizerConfig in the conf/functions-worker.yml file.
runtimeCustomizerConfig is the same across all functions. If you provide both runtimeCustomizerConfig and customRuntimeOptions, you need to decide how to manage these two configurations in your implementation of the KubernetesManifestCustomizer interface.
Pulsar includes a built-in implementation initialized with runtimeCustomizerConfig. It enables you to pass a JSON document as customRuntimeOptions with certain properties to augment. To use this built-in implementation, set runtimeCustomizerClassName to org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer.
If both runtimeCustomizerConfig and customRuntimeOptions are provided and have conflicts, BasicKubernetesManifestCustomizer uses customRuntimeOptions to override runtimeCustomizerConfig.
Below is an example of configuring customRuntimeOptions.
{
  "jobName": "jobname", // the k8s pod name to run this function instance
  "jobNamespace": "namespace", // the k8s namespace to run this function in
  "extractLabels": {           // extra labels to attach to the statefulSet, service, and pods
    "extraLabel": "value"
  },
  "extraAnnotations": {        // extra annotations to attach to the statefulSet, service, and pods
    "extraAnnotation": "value"
  },
  "nodeSelectorLabels": {      // node selector labels to add on to the pod spec
    "customLabel": "value"
  },
  "tolerations": [             // tolerations to add to the pod spec
    {
      "key": "custom-key",
      "value": "value",
      "effect": "NoSchedule"
    }
  ],
  "resourceRequirements": {  // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
    "requests": {
      "cpu": 1,
      "memory": "4G"
    },
    "limits": {
      "cpu": 2,
      "memory": "8G"
    }
  }
}
How to define Pulsar resource names when running Pulsar in Kubernetes
If you run Pulsar Functions or connectors on Kubernetes, you need to follow the Kubernetes naming convention to define the names of your Pulsar resources, whichever admin interface you use.
Kubernetes requires a name that can be used as a DNS subdomain name as defined in RFC 1123. Pulsar supports more legal characters than the Kubernetes naming convention. If you create a Pulsar resource name with special characters that are not supported by Kubernetes (for example, including colons in a Pulsar namespace name), Kubernetes runtime translates the Pulsar object names into Kubernetes resource labels which are in RFC 1123-compliant forms. Consequently, you can run functions or connectors using Kubernetes runtime. The rules for translating Pulsar object names into Kubernetes resource labels are as below:
- 
Truncate to 63 characters 
- 
Replace the following characters with dashes (-): - 
Non-alphanumeric characters 
- 
Underscores (_) 
- 
Dots (.) 
 
- 
- 
Replace beginning and ending non-alphanumeric characters with 0 
- If you get an error in translating Pulsar object names into Kubernetes resource labels (for example, you may have a naming collision if your Pulsar object name is too long) or want to customize the translating rules, see customize Kubernetes runtime.
- For how to configure Kubernetes runtime, see instructions.