Apache Flink® Configuration¶
In this page we will cover how to configure common Flink features with Ververica Platform.
The Deployment Template section of your Deployment provides a flinkConfiguration
attribute that allows you to specify the Flink configuration for created jobs. The following examples should be nested under Deployment.spec.template.spec
.
Environment Variables¶
You can reference environment variables inside flinkConfiguration
through shell format strings:
flinkConfiguration:
s3.access-key: ${S3_ACCESS_KEY}
This allows you to store sensitive information such as passwords, keys and tokens as Kubernetes secrets and reference them by setting up an environment variable in Deployment.spec.template.spec
or use a custom Docker image:
kubernetes:
pods:
envVars:
- name: S3_ACCESS_KEY
valueFrom:
secretKeyRef:
name: s3
key: accessKey
Note
This uses the same syntax as Secret Values. If a Secret Value with a given name exists, it will be preferred over an environment variable with the same name.
Savepoints¶
All stateful lifecycle operations (such as suspending a Deployment or executing a stateful upgrade) require a path under which to store savepoints. If Ververica Platform was configured with blob storage, it will preconfigure each Deployment for checkpoints, savepoints and High-Availability. Otherwise, please provide an entry in the flinkConfiguration
map with the key state.savepoints.dir
:
flinkConfiguration:
state.savepoints.dir: s3://flink/savepoints
The provided directory needs to be accessible by all nodes of your cluster. If Ververica Platform was configured with blob storage, the platform will handle the credentials distribution transparently and no further actions is required. Otherwise, you can, for instance, use a custom volume mount or filesystem configurations.
Please consult the official Flink documentation on savepoints for more details.
State in S3¶
To use S3 (or any other blob storage service) for savepoints and checkpoints configure Ververia Platform with blob storage.
If you choose not to do this, you can to provide S3 credentials as part of the flinkConfiguration
on the Deployment level.
flinkConfiguration:
s3.access-key: yourAccessKey
s3.secret-key: yourSecretKey
The default Flink images of Ververica Platforms ship with PrestoS3FileSystem
which is the default for the s3
and s3p
scheme.
Note
All default Flink images from 1.10 onwards also ship with the HadoopS3Filesystem
, which is only available via the s3a
scheme. For more information, please consult Flink S3 File Systems.
Queryable State¶
Ververica Platform supports Flink Queryable State.
To enable it, set flink.queryable-state.enabled: true
in deployment template’s annotation. Alternatively, use the toggle in the “Advanced” section of the Deployment configuration form in the web user interface.
When you run your Deployment, this will enable and configure Queryable State in the newly created Flink cluster and create a Kubernetes service for Flink’s taskmanagers.
This will allow you to query Flink job state with a QueryableStateClient
from Kubernetes cluster using
job-${jobId}-taskmanager:9069
as an address, where jobId
corresponds to the current job ID of your Deployment.
SSL/TLS Setup¶
Ververica Platform can be configured to use SSL/TLS for intra-cluster as well as client communication.
Ververica Platform supports Flink SSL/TLS setup in auto-provisioned manner.
To enable it, set security.ssl.enabled: true
in deployment template’s annotation.
This switches on SSL mutual authentication for Flink internal network communication and makes Flink REST API and Flink web user interface served via https.
Additionally, this will enable SSL mutual authentication for clients of Flink REST API and Flink web user interface).
By default, SSL is disabled (same as explicit setting security.ssl.enabled: false
).
Note
Enabling this option will set the required Flink SSL configuration parameters. Effectively this would ignore matching user settings in Apache Flink® Configuration (flink-conf.yaml).
Attention
Direct requests to Flink’s REST API will require a trusted client certificate when SSL mutual authentication is enabled. Access to the Flink web user interface (and REST API) is still possible through Ververica Platform’s Flink proxy.
SSL Provider¶
Currently Flink supports two different SSL providers.
security.ssl.provider: OPENSSL
: (default) OpenSSL-based SSL engine using system librariessecurity.ssl.provider: JDK
: pure Java-based SSL engine
Beginning with Flink 1.9, we support the usage of OpenSSL as an SSL provider for our images. The OpenSSL provider has a significantly lower performance overhead in comparison to the JDK implementation. For Flink versions 1.10 and above, it is the default provider.
Note
For Flink 1.9, you need to ensure that you at least use version 1.9.2-stream2
before setting the provider to OPENSSL
.
Otherwise, the Flink job will fail.
Attention
Beware that not all SSL algorithms which you can set at security.ssl.algorithms
might be supported by OpenSSL.
You can always set the provider back to the native JDK implementation security.ssl.provider: JDK
to allow other algorithms.
Implementation details¶
To provision Flink clusters with key stores and trust stores, which are required for transport security, Ververica Platform generates once
- a self signed certificate: the public key is shared with Flink jobmanager instances to facilitate Flink REST API and web user interface SSL client auth;
- a self signed signing certificate (CA) for signing SSL certificates for Flink jobmanager.
Both certificates (with private keys) are stored in a key store under the persisted directory of Ververica Platform.
Before starting a Deployment with the configuration enabled, Ververica Platform will generate
- a self signed certificate, used to enable secure Flink internal connectivity;
- a certificate signed by the signing certificate, to enable https on Flink jobmanager’s REST API and web UI.
Both certificates (with private keys) are saved in a Kubernetes secret, which later is mounted to each Deployment’s Flink nodes.
The implementation can be summarized in the following diagram:

Flink Jobmanager Failover (“HA”)¶
Apache Flink requires a so-called HighAvailabilityService
in order to be able to recover the internal state of the Flink Jobmanager process (including metadata about the latest checkpoint) on failures.
By default, Flink Jobmanager failover is not enabled. For production installations it is highly recommended to configure Flink with such a service.
Kubernetes (built-in)¶
The Apache Flink distribution of Ververica Platform ships with a HighAvailabilityService
that does not have any external service dependencies on top of Kubernetes.
The service is enabled via the following configuration:
flinkConfiguration:
high-availability: vvp-kubernetes
high-availability.storageDir: s3://vvp/flink-ha
If Universal Blob Storage is enabled, Flink’s high-availability.storageDir
will be configured automatically. In this case, it is not necessary to provide any high-availability
configuration in the Flink configuration manually.
Access Control¶
When enabled, your Flink application will require access to the API of your Kubernetes cluster. In particular two additional ConfigMap
resources will be created:
job-$jobId-flink-ha-jobmanager
: high availability metadatajob-$jobId-flink-ha-jobmanager-leader-election
: leader election
We will automatically create these config maps and restrict access to them via Kubernetes RBAC. The following permissions are granted for Flink if enabled:
Component | Permissions |
---|---|
Jobmanager |
|
Taskmanager |
|
The above access restrictions are applied via Kubernetes RBAC resources that are created per Flink job. If enabled, a ServiceAccount
will be assigned to each Flink pods that applies the respective authorization.
Limitations¶
- Since this
HighAvailabilityService
requires access to your Kubernetes cluster, it will put additional load on your Kubernetes cluster. As a rule of thumb, we recommend thisHighAvailabilityService
for small to medium-sized installations of 10s of jobs with parallelism <= 100. - If your default service account in target Kubernetes namespace has additional configuration (such as image pull secrets) or other access control restrictions, this will not be reflected in the created
ServiceAccount
resources. Therefore, you have to manually provide any additional configuration on the Deployment level (see Apache Flink® Pod Templates). - If your cluster does not use Kubernetes RBAC, you can’t use this high availability service.
ZooKeeper¶
Apache Flink ships with a ZooKeeper-based HighAvailabilityService
that is described in the official Flink documentation on high availability.
The following options are required to enable ZooKeeper-based high availability:
flinkConfiguration:
high-availability: zookeeper
high-availability.storageDir: s3://vvp/flink-ha
high-availability.zookeeper.quorum: zk-node1:2181, zk-node2:2181, zk-node3:2181
Ververica Platform will automatically scope all state by setting the high-availability.cluster-id
to the Deployment ID. Currently, you cannot override this behavior as it can lead to undefined side effects between jobs. Therefore, the above options are sufficient to configure Flink Jobmanager failover with Ververica Platform.
If Universal Blob Storage is enabled, Flink’s high-availability.storageDir
will be configured automatically. In this case, you can skip the entry for high-availability.storageDir
.