Skip to main content
Version: 2.14

Flink Configuration

The Apache Flink® Configuration contains both Flink cluster-level and Flink job-specific options. On this page, we describe how the Flink configuration is applied to your Deployment and highlight important configuration options.

Overview

The Flink configuration is specified as part of the Deployment Template.

kind: Deployment
spec:
template:
spec:
flinkConfiguration:
key: value

Please consult the official Flink documentation for a listing of available configuration options.

Depending on the Deployment Mode, the provided configuration is applied either on the Flink cluster-level (application mode) or on the Flink job-level (session mode).

  • Application Mode The Flink configuration is applied on the cluster-level.

  • Session Mode The Flink configuration is applied on the job-level.

Cluster-level configuration has to be provided in the SessionCluster resource referenced by the Deployment. The effective configuration during runtime is a merge of both the SessionCluster's and the Deployment's Flink configuration with job-level configuration having precedence over cluster-level configuration.

Job-level Configuration

Environment Variables

You can reference environment variables inside flinkConfiguration through shell format strings:

kind: Deployment
spec:
template:
spec:
flinkConfiguration:
s3.access-key: ${S3_ACCESS_KEY}

This allows you to store sensitive information such as passwords, keys and tokens as Kubernetes secrets and reference them by setting up an environment variable via Kubernetes Pod Templates or a custom Flink Docker image.

note

This uses the same syntax as Secret Values. If a Secret Value with a given name exists, it will be preferred over an environment variable with the same name.

Restart Strategies

Flink job restart strategies control how Flink jobs are restarted after a failure, such as exceptions from user code, machine failures, or network disruptions. By default, if checkpointing is enabled, the Flink job will be restarted with a fixed delay after every failure. Please consult the official Flink documentation on restart strategies for more details about available options.

In addition, Ververica Platform provides a configuration assistant for Flink restart strategies in its web interface and reports observed Flink job restarts in the status section of the corresponding Job resource. If a Job is observed to fail multiple times over a short period of time, the Deployment will indicate this in its status. Please refer to the Deployment Lifecycle page for more details.

Cluster-level Configuration

Cluster-level configuration is only applicable to Deployments executed in application mode. For Deployments executed in session mode, you have to configure cluster-level options in the SessionCluster resource referenced by the Deployment.

File Systems

The default Flink Docker images provided by Ververica Platform include FileSystem implementations for popular blob storage providers.

Available FileSystem Implementations

Blob Storage ProviderSchemeFileSystem Implementation
File SystemfileLocalFileSystem
AWS S3s3, s3pPrestoS3FileSystem
AWS S3s3aS3AFileSystem
Microsoft ABSwasbsNativeAzureFileSystem
Apache Hadoop® HDFShdfsHadoopFileSystem

If you use Universal Blob Storage, all relevant Flink options, including credentials, will be configured on the Flink cluster-level. Please consult the official Flink documentation for details about manual configuration of file systems.

Note that additional file system implementations have to be loaded as Flink plugins which requires a custom Flink Docker image.

High-Availability (Jobmanager Failover)

Flink requires an external service for high-availability in order to be able to recover the internal state of the Flink Jobmanager process (including metadata about checkpoints) on failures.

By default, Flink Jobmanager failover is not enabled. For production installations it is highly recommended to configure Flink with such a service. Please refer to the Flink configuration for details on how to configure high-availability services manually.

caution

For Deployments executed in session mode, you have to configure high-availability in the SessionCluster resource referenced by the Deployment.

Kubernetes

If Universal Blob Storage is not configured, you have to additionally provide the high-availability.storageDir configuration.

By default, the main method of your Flink job will be re-executed on Jobmanager failover. Otherwise, you can enable JobGraph storage by providing the high-availability.vvp-kubernetes.job-graph-store.enabled: true config option, which will store the JobGraph in blob storage and restore from it on failover without re-executing your main method. JobGraph storage is disabled by default in application mode and always enabled in session mode.

kind: Deployment
spec:
template:
spec:
flinkConfiguration:
high-availability: kubernetes

If Universal Blob Storage is not configured, you have to additionally provide the high-availability.storageDir configuration.

Access Control

When enabled, your Flink application will require access to the API of your Kubernetes cluster.

ConfigMap resources will be automatically created for high-availability metadata and leader election of Jobmanager.

For Flink Kubernetes:

  • job-$jobId-$jobIdWithoutDash-config-map: high availability metadata
  • job-$jobId-cluster-config-map: leader election

Flink Kubernetes requires the permissions to get/create/edit/delete ConfigMaps in namespace.

The permissions are applied via Kubernetes RBAC resources that are created per Flink cluster. The created roles are bound to a custom ServiceAccount that is assigned to each Flink pod of the cluster.

Limitations
  • This high-availability service accesses your Kubernetes cluster and will put additional load on the Kubernetes API server. As a rule of thumb, we recommend this service only for small to medium-sized installations of 10s of Flink clusters with up to 100 Taskmanager instances.
  • If your default service account in target Kubernetes namespace has additional configuration (such as image pull secrets) or other access control restrictions, this will not be reflected in the created ServiceAccount resources. Therefore, you have to manually provide any additional configuration on the Deployment level (see Kubernetes Resources).
  • If your cluster does not use Kubernetes RBAC, you can't use this high-availability service.

ZooKeeper

Apache Flink® ships with a ZooKeeper-based high-availability service that is described in the official Flink documentation on high availability.

The following options are required to enable ZooKeeper-based high availability:

kind: Deployment
spec:
template:
spec:
flinkConfiguration:
high-availability: zookeeper
high-availability.zookeeper.quorum: zk-node1:2181, zk-node2:2181, zk-node3:2181

Ververica Platform will automatically scope all state by setting the high-availability.cluster-id to the Deployment ID. Currently, you cannot override this behavior as it can lead to undefined side effects between jobs. Therefore, the above options are sufficient to configure Flink Jobmanager failover with Ververica Platform.

If Universal Blob Storage is not configured, you have to additionally provide the high-availability.storageDir configuration.

SSL/TLS Setup

Ververica Platform can be configured to auto-provision SSL/TLS for Flink via the following annotation:

kind: Deployment
spec:
template:
metadata:
annotations:
security.ssl.enabled: "true"

This enables SSL with mutual authentication for Flink's internal network communication and Flink's REST API and web user interface. Therefore, requests to Flink's REST API have will to flow via Ververica Platform's Flink proxy which has access to the trusted client certificate.

By default, SSL is disabled. Enabling this option will set the required Flink SSL configuration parameters, overwriting any existing configuration.

caution

For Deployments executed in session mode, you have to configure SSL/TLS in the SessionCluster resource referenced by the Deployment.

SSL Provider

Currently Flink supports two different SSL providers.

  • security.ssl.provider: OPENSSL: (default) OpenSSL-based SSL engine using system libraries
  • security.ssl.provider: JDK: pure Java-based SSL engine

Note that not all SSL algorithms which you can set via security.ssl.algorithms might be supported by OpenSSL. You can set the provider back to the native JDK implementation security.ssl.provider: JDK to allow other algorithms.

Implementation Details

To provision Flink clusters with key stores and trust stores, which are required for transport security, Ververica Platform generates once

  • a self signed certificate: the public key is shared with Flink jobmanager instances to facilitate Flink REST API and web user interface SSL client auth;
  • a self signed signing certificate (CA) for signing SSL certificates for Flink jobmanager.

Both certificates (with private keys) are stored in a key store under the persisted directory of Ververica Platform.

Before starting a Deployment with the configuration enabled, Ververica Platform will generate

  • a self signed certificate, used to enable secure Flink internal connectivity;
  • a certificate signed by the signing certificate, to enable https on Flink jobmanager's REST API and web UI.

Both certificates (with private keys) are saved in a Kubernetes secret, which later is mounted to each Deployment's Flink nodes.

The implementation can be summarized in the following diagram:

Image

Queryable State

Ververica Platform supports Flink Queryable State:

kind: Deployment
spec:
template:
metadata:
annotations:
flink.queryable-state.enabled: "true"

This enables Queryable State and configures an additional Kubernetes service that allows access to the Queryable State server launched on each Taskmanager.

This will allow you to query Flink state with a QueryableStateClient from the Kubernetes cluster using job-${jobId}-taskmanager:9069 as an address, where jobId corresponds to the ID of the current Job resource of your Deployment.

caution

For Deployments executed in session mode, you have to configure queryable state in the SessionCluster resource referenced by the Deployment.