Skip to main content
Version: 2.12

Flink Configuration

Flink configuration options provided on the SessionCluster resource are applied on the Flink cluster-level. On this page, we describe how the Flink configuration is applied to your session cluster and highlight important configuration options.

Overview

The Flink configuration is specified as part of the SessionCluster spec.

kind: SessionCluster
spec:
flinkConfiguration:
key: value

Please consult the official Flink documentation for a listing of available configuration options.

Environment Variables

You can reference environment variables inside flinkConfiguration through shell format strings:

kind: SessionCluster
spec:
flinkConfiguration:
s3.access-key: ${S3_ACCESS_KEY}

This allows you to store sensitive information such as passwords, keys and tokens as Kubernetes secrets and reference them by setting up an environment variable via Kubernetes Pod Templates or a custom Flink Docker image.

File Systems

The default Flink Docker images provided by Ververica Platform include FileSystem implementations for popular blob storage providers.

Blob Storage ProviderSchemeFileSystem Implementation
File SystemfileLocalFileSystem
AWS S3s3, s3pPrestoS3FileSystem
AWS S3s3aS3AFileSystem
Microsoft ABSwasbsNativeAzureFileSystem
Apache Hadoop® HDFShdfsHadoopFileSystem
Microsoft ABS Workload IdentitywiazVerverica custom plugin implementation

If you use Universal Blob Storage, all relevant Flink options, including credentials, will be configured on the Flink cluster-level. Please consult the official Flink documentation for details about manual configuration of file systems.

Note that additional file system implementations have to be loaded as Flink plugins which requires a custom Flink Docker image.

High-Availability (JobManager Failover)

Flink requires an external service for high-availability in order to be able to recover the internal state of the Flink JobManager process (including metadata about checkpoints) on failures.

By default, Flink Jobmanager failover is not enabled. For production installations it is highly recommended to configure Flink with such a service. Please refer to the Flink configuration for details on how to configure high-availability services manually.

Kubernetes

Ververica Platform supports two options for Flink high-availability services on Kubernetes that do not have any additional dependencies, Flink Kubernetes and Ververica Platform Kubernetes. See Kubernetes High-Availability Service for discussion.

The high-availability service is enabled via the following configuration:

kind: SessionCluster
spec:
flinkConfiguration:
high-availability: kubernetes

If Universal Blob Storage is not configured, you have to additionally provide the high-availability.storageDir configuration.

Ververica Platform Kubernetes

note

From Ververica Platform 2.10.0 Ververica Platform Kubernetes is deprecated, and it will be removed in Ververica Platform 2.12. We recommend that Flink applications that use Ververica Platform Kubernetes are migrated to use Flink Kubernetes instead.

kind: SessionCluster
spec:
flinkConfiguration:
high-availability: vvp-kubernetes

If Universal Blob Storage is not configured, you have to additionally provide the high-availability.storageDir configuration.

Access Control

When enabled, your Flink application will require access to the API of your Kubernetes cluster.

ConfigMap resources will be automatically created for high-availability metadata and leader election of JobManager.

For Flink Kubernetes:

  • session-$sessionClusterId-$jobIdWithoutDash-config-map: high availability metadata
  • session-$sessionClusterId-cluster-config-map: leader election

For Ververica Platform Kubernetes:

  • sessioncluster-$sessionClusterId-flink-ha-jobmanager: high availability metadata
  • sessioncluster-$sessionClusterId-flink-ha-jobmanager-leader-election: leader election

Both Flink Kubernetes and Ververica Platform Kubernetes require permissions to get/create/edit/delete ConfigMaps in the namespace.

The permissions are applied via Kubernetes Role Based Access Control (RBAC) resources that are created per Flink cluster. The created roles are bound to a custom ServiceAccount that is assigned to each Flink pod of the cluster.

Limitations

  • High-availability service accesses your Kubernetes cluster, see Kubernetes High-Availability Service for details.
  • If your default service account in target Kubernetes namespace has additional configuration (such as image pull secrets) or other access control restrictions, this will not be reflected in the created ServiceAccount resources. Therefore, you have to manually provide any additional configuration on the Deployment level, see Kubernetes Resources.
  • The High Availability Service requires Kubernetes RBAC.

Deprecated Ververica Platform Kubernetes

Ververica Platform Kubernetes has some additional limitations:

  • The JobManager ConfigMap sessioncluster-$sessionClusterId-flink-ha-jobmanager is used to store failover metadata. ConfigMaps and other API resources in Kubernetes have a maximum resource size that limits how much metadata can be stored for recovery. The metadata size grows with the number of Flink jobs running on the session cluster. The number of jobs that can be executed concurrently per cluster varies depending on your configuration (e.g. how many checkpoints are retained, storage location, etc.), but in a typical installation you can expect to be able to run 100+ jobs per session cluster without running into the size limit.
  • The latest state restore strategy requires the Kubernetes high-availability service. Checkpoint metadata is cleaned up periodically in order to avoid hitting the maximum resource size limit mentioned above. If Ververica Platform cannot create a Savepoint resource for each retained checkpoint before the metadata is cleaned up, retained checkpoints might be missed for Deployments running in session mode.
  • By default, checkpoint metadata is cleaned up 15 minutes after the job has been unregistered. If you experience unexpected delays during Deployment termination, you can increase this delay via the high-availability.vvp-kubernetes.checkpoint-store.gc.cleanup-after-minutes: 15 option. This limitation does not apply to Deployments running in application mode.

SSL/TLS Setup

Deployments in application mode can be configured to auto-provision SSL/TLS for Flink. Session clusters currently don't support this.

Please refer to the Flink configuration for details on how to configure SSL manually.

Queryable State

Ververica Platform supports Flink Queryable State.

kind: SessionCluster
metadata:
annotations:
flink.queryable-state.enabled: "true"

This enables Queryable State and configures an additional Kubernetes service that allows access to the Queryable State server launched on each TaskManager.

This will allow you to query Flink state with a QueryableStateClient from the Kubernetes cluster using sessioncluster-${sessionClusterId}-taskmanager:9069 as an address, where sessionClusterId corresponds to the ID of your session cluster.