Logo
2.4
  • Getting Started
  • Installation & Upgrades
  • Administration & Configuration
  • User Guide
    • Apache Flink® Operations
      • Deployments
      • Session Clusters
        • Flink Configuration
        • Resources (CPU, Memory)
        • Scaling Session Clusters
        • Logging
        • Kubernetes Pod Templates
        • Event Log
      • Packaging your Application
      • Artifact Management
      • Lifecycle Management
      • Autopilot
      • REST API & CI/CD
    • Apache Flink® SQL Development
  • Resources
  • Release Notes
Ververica Platform
  • Docs »
  • User Guide »
  • Apache Flink® Operations »
  • Session Clusters »
  • Flink Configuration

Flink Configuration¶

Flink configuration options provided on the SessionCluster resource are applied on the Flink cluster-level. On this page, we describe how the Flink configuration is applied to your session cluster and highlight important configuration options.

  • Overview
  • Environment Variables
  • File Systems
  • High-Availability (JobManager Failover)
    • Kubernetes
  • SSL/TLS Setup
  • Queryable State

Overview¶

The Flink configuration is specified as part of the SessionCluster spec.

kind: SessionCluster
spec:
  flinkConfiguration:
    key: value

Please consult the official Flink documentation for a listing of available configuration options.

Environment Variables¶

You can reference environment variables inside flinkConfiguration through shell format strings:

kind: SessionCluster
spec:
  flinkConfiguration:
    s3.access-key: ${S3_ACCESS_KEY}

This allows you to store sensitive information such as passwords, keys and tokens as Kubernetes secrets and reference them by setting up an environment variable via Kubernetes Pod Templates or a custom Flink Docker image.

File Systems¶

The default Flink Docker images provided by Ververica Platform include FileSystem implementations for popular blob storage providers.

Available FileSystem Implementations¶
Blob Storage Provider Scheme FileSystem Implementation
File System file LocalFileSystem
AWS S3 s3, s3p PrestoS3FileSystem
AWS S3 s3a S3AFileSystem
Microsoft ABS wasbs NativeAzureFileSystem
Apache Hadoop® HDFS hdfs HadoopFileSystem

If you use Universal Blob Storage, all relevant Flink options, including credentials, will be configured on the Flink cluster-level. Please consult the official Flink documentation for details about manual configuration of file systems.

Note that additional file system implementations have to be loaded as Flink plugins which requires a custom Flink Docker image.

High-Availability (JobManager Failover)¶

Flink requires an external service for high-availability in order to be able to recover the internal state of the Flink JobManager process (including metadata about checkpoints) on failures.

By default, Flink Jobmanager failover is not enabled. For production installations it is highly recommended to configure Flink with such a service. Please refer to the Flink configuration for details on how to configure high-availability services manually.

Kubernetes¶

The Flink distribution of Ververica Platform ships with a high-availability service that does not have any external service dependencies on top of Kubernetes.

The service is enabled via the following configuration:

kind: SessionCluster
spec:
  flinkConfiguration:
    high-availability: vvp-kubernetes

If Universal Blob Storage is not configured, you have to additionally provide the high-availability.storageDir configuration.

Access Control¶

When enabled, your Flink application will require access to your Kubernetes cluster’s API. In particular, two additional ConfigMap resources will be created:

  • sessioncluster-$sessionClusterId-flink-ha-jobmanager: high availability metadata
  • sessioncluster-$sessionClusterId-flink-ha-jobmanager-leader-election: leader election

We will automatically create these ConfigMaps and restrict access to them via Kubernetes RBAC. The following permissions are granted for Flink if enabled:

Component Permissions
JobManager
  • Read/write for ConfigMap sessioncluster-$sessionClusterId-flink-ha-jobmanager
  • Read/write for ConfigMap sessioncluster-$sessionClusterId-flink-ha-jobmanager-leader-election
  • Create ConfigMaps in namespace
TaskManager
  • Read-only for ConfigMap sessioncluster-$sessionClusterId-flink-ha-jobmanager

The above access restrictions are applied via Kubernetes RBAC resources that are created per Flink cluster. The created roles are bound to a custom ServiceAccount that is assigned to each Flink pod of the cluster.

Limitations¶

  • This high-availability service accesses your Kubernetes cluster and will put additional load on the Kubernetes API server. As a rule of thumb, we recommend this service only for small to medium-sized installations of 10s of Flink clusters with up to 100 TaskManager instances.

  • If your default service account in target Kubernetes namespace has additional configuration (such as image pull secrets) or other access control restrictions, this will not be reflected in the created ServiceAccount resources. Therefore, you have to manually provide any additional configuration on the Deployment level (see Kubernetes Pod Templates).

  • If your cluster does not use Kubernetes RBAC, you can’t use this high-availability service.

  • The JobManager ConfigMap sessioncluster-$sessionClusterId-flink-ha-jobmanager is used to store failover metadata. ConfigMaps and other API resources in Kubernetes have a maximum resource size that limits how much metadata can be stored for recovery. The metadata size grows with the number of Flink jobs running on the session cluster. The number of jobs that can be executed concurrently per cluster varies depending on your configuration (e.g. how many checkpoints are retained, storage location, etc.), but in a typical installation you can expect to be able to run 100+ jobs per session cluster without running into the size limit.

  • The latest state restore strategy requires the Kubernetes high-availability service. Checkpoint metadata is cleaned up periodically in order to avoid hitting the maximum resource size limit mentioned above. If Ververica Platform cannot create a Savepoint resource for each retained checkpoint before the metadata is cleaned up, retained checkpoints might be missed for Deployments running in session mode.

    By default, checkpoint metadata is cleaned up 15 minutes after the job has been unregistered. If you experience unexpected delays during Deployment termination, you can increase this delay via the high-availability.vvp-kubernetes.checkpoint-store.gc.cleanup-after-minutes: 15 option.

    This limitation does not apply to Deployments running in application mode.

SSL/TLS Setup¶

Deployments in application mode can be configured to auto-provision SSL/TLS for Flink. Session clusters currently don’t support this.

Please refer to the Flink configuration for details on how to configure SSL manually.

Queryable State¶

Ververica Platform supports Flink Queryable State:

kind: SessionCluster
metadata:
  annotations:
    flink.queryable-state.enabled: "true"

This enables Queryable State and configures an additional Kubernetes service that allows access to the Queryable State server launched on each TaskManager.

This will allow you to query Flink state with a QueryableStateClient from the Kubernetes cluster using sessioncluster-${sessionClusterId}-taskmanager:9069 as an address, where sessionClusterId corresponds to the ID of your session cluster.

Next Previous

© Copyright 2023, Ververica GmbH.

Apache Flink, Apache Hadoop, Apache Kafka, Apache ORC, Apache Parquet, Apache Avro, Apache HCatalog, Apache HBase, Apache Cassandra, Flink®, Hadoop®, Kafka®, ORC®, Parquet®, Avro®, HCatalog®, HBase®, Cassandra®, Apache®, the squirrel logo, and the Apache feather logo and any other Apache project name or logo are either registered trademarks or trademarks of The Apache Software Foundation.

Report an issue with this documentation page | Imprint

Other Versions v: v2.4
Tags
v2.10
v2.9
v2.8
v2.7
v2.6
v2.5
v2.4
v2.3
v2.2
v2.1
v2.0
v1.4
v1.3
v1.2
v1.1
v1.0
sql-eap