Flink Configuration¶
Flink configuration options provided on the SessionCluster resource are applied on the Flink cluster-level. On this page, we describe how the Flink configuration is applied to your session cluster and highlight important configuration options.
Overview¶
The Flink configuration is specified as part of the SessionCluster spec.
kind: SessionCluster
spec:
flinkConfiguration:
key: value
Please consult the official Flink documentation for a listing of available configuration options.
Environment Variables¶
You can reference environment variables inside flinkConfiguration
through shell format strings:
kind: SessionCluster
spec:
flinkConfiguration:
s3.access-key: ${S3_ACCESS_KEY}
This allows you to store sensitive information such as passwords, keys and tokens as Kubernetes secrets and reference them by setting up an environment variable via Kubernetes Pod Templates or a custom Flink Docker image.
File Systems¶
The default Flink Docker images provided by Ververica Platform include FileSystem implementations for popular blob storage providers.
Blob Storage Provider | Scheme | FileSystem Implementation |
---|---|---|
File System | file |
LocalFileSystem |
AWS S3 | s3 , s3p |
PrestoS3FileSystem |
AWS S3 | s3a |
S3AFileSystem |
Microsoft ABS | wasbs |
NativeAzureFileSystem |
Apache Hadoop® HDFS | hdfs |
HadoopFileSystem |
If you use Universal Blob Storage, all relevant Flink options, including credentials, will be configured on the Flink cluster-level. Please consult the official Flink documentation for details about manual configuration of file systems.
Note that additional file system implementations have to be loaded as Flink plugins which requires a custom Flink Docker image.
High-Availability (JobManager Failover)¶
Flink requires an external service for high-availability in order to be able to recover the internal state of the Flink JobManager process (including metadata about checkpoints) on failures.
By default, Flink Jobmanager failover is not enabled. For production installations it is highly recommended to configure Flink with such a service. Please refer to the Flink configuration for details on how to configure high-availability services manually.
Kubernetes¶
The Flink distribution of Ververica Platform ships with a high-availability service that does not have any external service dependencies on top of Kubernetes.
The service is enabled via the following configuration:
kind: SessionCluster
spec:
flinkConfiguration:
high-availability: vvp-kubernetes
If Universal Blob Storage is not configured, you have to additionally provide the high-availability.storageDir
configuration.
Access Control¶
When enabled, your Flink application will require access to your Kubernetes cluster’s API. In particular, two additional ConfigMap
resources will be created:
sessioncluster-$sessionClusterId-flink-ha-jobmanager
: high availability metadatasessioncluster-$sessionClusterId-flink-ha-jobmanager-leader-election
: leader election
We will automatically create these ConfigMaps and restrict access to them via Kubernetes RBAC. The following permissions are granted for Flink if enabled:
Component | Permissions |
---|---|
JobManager |
|
TaskManager |
|
The above access restrictions are applied via Kubernetes RBAC resources that are created per Flink cluster. The created roles are bound to a custom ServiceAccount
that is assigned to each Flink pod of the cluster.
Limitations¶
This high-availability service accesses your Kubernetes cluster and will put additional load on the Kubernetes API server. As a rule of thumb, we recommend this service only for small to medium-sized installations of 10s of Flink clusters with up to 100 TaskManager instances.
If your default service account in target Kubernetes namespace has additional configuration (such as image pull secrets) or other access control restrictions, this will not be reflected in the created
ServiceAccount
resources. Therefore, you have to manually provide any additional configuration on the Deployment level (see Kubernetes Resources).If your cluster does not use Kubernetes RBAC, you can’t use this high-availability service.
The JobManager ConfigMap
sessioncluster-$sessionClusterId-flink-ha-jobmanager
is used to store failover metadata. ConfigMaps and other API resources in Kubernetes have a maximum resource size that limits how much metadata can be stored for recovery. The metadata size grows with the number of Flink jobs running on the session cluster. The number of jobs that can be executed concurrently per cluster varies depending on your configuration (e.g. how many checkpoints are retained, storage location, etc.), but in a typical installation you can expect to be able to run 100+ jobs per session cluster without running into the size limit.The latest state restore strategy requires the Kubernetes high-availability service. Checkpoint metadata is cleaned up periodically in order to avoid hitting the maximum resource size limit mentioned above. If Ververica Platform cannot create a Savepoint resource for each retained checkpoint before the metadata is cleaned up, retained checkpoints might be missed for Deployments running in session mode.
By default, checkpoint metadata is cleaned up 15 minutes after the job has been unregistered. If you experience unexpected delays during Deployment termination, you can increase this delay via the
high-availability.vvp-kubernetes.checkpoint-store.gc.cleanup-after-minutes: 15
option.This limitation does not apply to Deployments running in application mode.
SSL/TLS Setup¶
Deployments in application mode can be configured to auto-provision SSL/TLS for Flink. Session clusters currently don’t support this.
Please refer to the Flink configuration for details on how to configure SSL manually.
Queryable State¶
Ververica Platform supports Flink Queryable State:
kind: SessionCluster
metadata:
annotations:
flink.queryable-state.enabled: "true"
This enables Queryable State and configures an additional Kubernetes service that allows access to the Queryable State server launched on each TaskManager.
This will allow you to query Flink state with a QueryableStateClient
from the Kubernetes cluster using
sessioncluster-${sessionClusterId}-taskmanager:9069
as an address, where sessionClusterId
corresponds to the ID of your session cluster.