Flink Configuration
The Apache Flink® Configuration contains both Flink cluster-level and Flink job-specific options. On this page, we describe how the Flink configuration is applied to your Deployment and highlight important configuration options.
Overview
The Flink configuration is specified as part of the Deployment Template.
kind: Deployment
spec:
template:
spec:
flinkConfiguration:
key: value
Please consult the official Flink documentation for a listing of available configuration options.
Depending on the Deployment Mode, the provided configuration is applied either on the Flink cluster-level (application mode) or on the Flink job-level (session mode).
-
Application Mode The Flink configuration is applied on the cluster-level.
-
Session Mode The Flink configuration is applied on the job-level.
Cluster-level configuration has to be provided in the SessionCluster resource referenced by the Deployment. The effective configuration during runtime is a merge of both the SessionCluster's and the Deployment's Flink configuration with job-level configuration having precedence over cluster-level configuration.
Job-level Configuration
Environment Variables
You can reference environment variables inside flinkConfiguration
through shell format strings:
kind: Deployment
spec:
template:
spec:
flinkConfiguration:
s3.access-key: ${S3_ACCESS_KEY}
This allows you to store sensitive information such as passwords, keys and tokens as Kubernetes secrets and reference them by setting up an environment variable via Kubernetes Pod Templates or a custom Flink Docker image.
This uses the same syntax as Secret Values. If a Secret Value with a given name exists, it will be preferred over an environment variable with the same name.
Restart Strategies
Flink job restart strategies control how Flink jobs are restarted after a failure, such as exceptions from user code, machine failures, or network disruptions. By default, if checkpointing is enabled, the Flink job will be restarted with a fixed delay after every failure. Please consult the official Flink documentation on restart strategies for more details about available options.
In addition, Ververica Platform provides a configuration assistant for Flink restart strategies in its web interface and reports observed Flink job restarts in the status
section of the corresponding Job resource. If a Job is observed to fail multiple times over a short period of time, the Deployment will indicate this in its status
. Please refer to the Deployment Lifecycle page for more details.
Cluster-level Configuration
Cluster-level configuration is only applicable to Deployments executed in application mode. For Deployments executed in session mode, you have to configure cluster-level options in the SessionCluster resource referenced by the Deployment.
File Systems
The default Flink Docker images provided by Ververica Platform include FileSystem implementations for popular blob storage providers.
Available FileSystem Implementations
Blob Storage Provider | Scheme | FileSystem Implementation |
---|---|---|
File System | file | LocalFileSystem |
AWS S3 | s3 , s3p | PrestoS3FileSystem |
AWS S3 | s3a | S3AFileSystem |
Microsoft ABS | wasbs | NativeAzureFileSystem |
Apache Hadoop® HDFS | hdfs | HadoopFileSystem |
If you use Universal Blob Storage, all relevant Flink options, including credentials, will be configured on the Flink cluster-level. Please consult the official Flink documentation for details about manual configuration of file systems.
Note that additional file system implementations have to be loaded as Flink plugins which requires a custom Flink Docker image.
High-Availability (Jobmanager Failover)
Flink requires an external service for high-availability in order to be able to recover the internal state of the Flink Jobmanager process (including metadata about checkpoints) on failures.
By default, Flink Jobmanager failover is not enabled. For production installations it is highly recommended to configure Flink with such a service. Please refer to the Flink configuration for details on how to configure high-availability services manually.
For Deployments executed in session mode, you have to configure high-availability in the SessionCluster resource referenced by the Deployment.
Kubernetes
If Universal Blob Storage is not configured, you have to additionally provide the high-availability.storageDir
configuration.
By default, the main method of your Flink job will be re-executed on Jobmanager failover. Otherwise, you can enable JobGraph storage by providing the high-availability.vvp-kubernetes.job-graph-store.enabled: true
config option, which will store the JobGraph in blob storage and restore from it on failover without re-executing your main method. JobGraph storage is disabled by default in application mode and always enabled in session mode.
Flink Kubernetes
kind: Deployment
spec:
template:
spec:
flinkConfiguration:
high-availability: kubernetes
If Universal Blob Storage is not configured, you have to additionally provide the high-availability.storageDir configuration.
Access Control
When enabled, your Flink application will require access to the API of your Kubernetes cluster.
ConfigMap
resources will be automatically created for high-availability metadata and leader election of Jobmanager.
For VVP Kubernetes:
job-$jobId-flink-ha-jobmanager
: high availability metadatajob-$jobId-flink-ha-jobmanager
: leader-election
For Flink Kubernetes:
job-$jobId-$jobIdWithoutDash-config-map
: high availability metadatajob-$jobId-cluster-config-map
: leader election
Both VVP Kubernetes and Flink Kubernetes requires the permissions to get/create/edit/delete ConfigMaps in namespace.
The permissions are applied via Kubernetes RBAC resources that are created per Flink cluster. The created roles are bound to a custom ServiceAccount
that is assigned to each Flink pod of the cluster.
Limitations
- This high-availability service accesses your Kubernetes cluster and will put additional load on the Kubernetes API server. As a rule of thumb, we recommend this service only for small to medium-sized installations of 10s of Flink clusters with up to 100 Taskmanager instances.
- If your default service account in target Kubernetes namespace has additional configuration (such as image pull secrets) or other access control restrictions, this will not be reflected in the created
ServiceAccount
resources. Therefore, you have to manually provide any additional configuration on the Deployment level (see Kubernetes Resources). - If your cluster does not use Kubernetes RBAC, you can't use this high-availability service.
ZooKeeper
Apache Flink® ships with a ZooKeeper-based high-availability service that is described in the official Flink documentation on high availability.
The following options are required to enable ZooKeeper-based high availability:
kind: Deployment
spec:
template:
spec:
flinkConfiguration:
high-availability: zookeeper
high-availability.zookeeper.quorum: zk-node1:2181, zk-node2:2181, zk-node3:2181
Ververica Platform will automatically scope all state by setting the high-availability.cluster-id
to the Deployment ID. Currently, you cannot override this behavior as it can lead to undefined side effects between jobs. Therefore, the above options are sufficient to configure Flink Jobmanager failover with Ververica Platform.
If Universal Blob Storage is not configured, you have to additionally provide the high-availability.storageDir
configuration.
SSL/TLS Setup
Ververica Platform can be configured to auto-provision SSL/TLS for Flink via the following annotation:
kind: Deployment
spec:
template:
metadata:
annotations:
security.ssl.enabled: "true"
This enables SSL with mutual authentication for Flink's internal network communication and Flink's REST API and web user interface. Therefore, requests to Flink's REST API have will to flow via Ververica Platform's Flink proxy which has access to the trusted client certificate.
By default, SSL is disabled. Enabling this option will set the required Flink SSL configuration parameters, overwriting any existing configuration.
For Deployments executed in session mode, you have to configure SSL/TLS in the SessionCluster resource referenced by the Deployment.
SSL Provider
Currently Flink supports two different SSL providers.
security.ssl.provider: OPENSSL
: (default) OpenSSL-based SSL engine using system librariessecurity.ssl.provider: JDK
: pure Java-based SSL engine
Note that not all SSL algorithms which you can set via security.ssl.algorithms
might be supported by OpenSSL. You can set the provider back to the native JDK implementation security.ssl.provider: JDK
to allow other algorithms.
Implementation Details
To provision Flink clusters with key stores and trust stores, which are required for transport security, Ververica Platform generates once
- a self signed certificate: the public key is shared with Flink jobmanager instances to facilitate Flink REST API and web user interface SSL client auth;
- a self signed signing certificate (CA) for signing SSL certificates for Flink jobmanager.
Both certificates (with private keys) are stored in a key store under the persisted directory of Ververica Platform.
Before starting a Deployment with the configuration enabled, Ververica Platform will generate
- a self signed certificate, used to enable secure Flink internal connectivity;
- a certificate signed by the signing certificate, to enable https on Flink jobmanager's REST API and web UI.
Both certificates (with private keys) are saved in a Kubernetes secret, which later is mounted to each Deployment's Flink nodes.
The implementation can be summarized in the following diagram:
Queryable State
Ververica Platform supports Flink Queryable State:
kind: Deployment
spec:
template:
metadata:
annotations:
flink.queryable-state.enabled: "true"
This enables Queryable State and configures an additional Kubernetes service that allows access to the Queryable State server launched on each Taskmanager.
This will allow you to query Flink state with a QueryableStateClient
from the Kubernetes cluster using
job-${jobId}-taskmanager:9069
as an address, where jobId
corresponds to the ID of the current Job resource of your Deployment.
For Deployments executed in session mode, you have to configure queryable state in the SessionCluster resource referenced by the Deployment.