Logo
2.1
  • Getting Started
  • Installation & Upgrades
  • Administration & Configuration
  • User Guide
    • Apache Flink Deployments
      • Deployment Templates
      • Apache Flink Configuration
      • Flink Pod Templates
      • Secret Values
      • Jobs
      • Status
      • Savepoints
      • Event Log
    • Application Lifecycle Management
    • Artifact Management
    • REST API & CI/CD
  • Streaming Ledger
  • Resources
  • Release Notes
Ververica Platform
  • Docs »
  • User Guide »
  • Apache Flink Deployments »
  • Apache Flink Configuration

Apache Flink Configuration¶

See also

Placeholders in Flink Configuration

In this page we will cover how to configure common Flink features with Ververica Platform.

The Deployment Template section of your Deployment provides a flinkConfiguration attribute that allows you to specify the Flink configuration for created jobs. The following examples should be nested under Deployment.spec.template.spec.

  • Savepoints
    • State in S3
  • Queryable State
  • SSL/TLS Setup
    • SSL Provider
    • Implementation details
  • Flink Master Failover (“HA”)
    • Kubernetes (built-in)
      • Access Control
      • Limitations
    • ZooKeeper

Savepoints¶

All stateful lifecycle operations (such as suspending a Deployment or executing a stateful upgrade) require a path under which to store savepoints. If Ververica Platform was configured with blob storage, it will preconfigure each Deployment for checkpoints, savepoints and High-Availability. Otherwise, please provide an entry in the flinkConfiguration map with the key state.savepoints.dir:

flinkConfiguration:
  state.savepoints.dir: s3://flink/savepoints

The provided directory needs to be accessible by all nodes of your cluster. If Ververica Platform was configured with blob storage, the platform will handle the credentials distribution transparently and no further actions is required. Otherwise, you can, for instance, use a custom volume mount or filesystem configurations.

Please consult the official Flink documentation on savepoints for more details.

State in S3¶

To use S3 (or any other blob storage service) for savepoints and checkpoints configure Ververia Platform with blob storage. If you choose not to do this, you can to provide S3 credentials as part of the flinkConfiguration on the Deployment level.

flinkConfiguration:
  s3.access-key: yourAccessKey
  s3.secret-key: yourSecretKey

The default Flink images of Ververica Platforms ship with PrestoS3FileSystem. Please refer to the Flink documentation on AWS deployment for more details.

Queryable State¶

Ververica Platform supports Flink Queryable State.

To enable it, set flink.queryable-state.enabled: true in deployment template’s annotation. Alternatively, use the toggle in the “Advanced” section of the Deployment configuration form in the web user interface. When you run your Deployment, this will enable and configure Queryable State in the newly created Flink cluster and create a Kubernetes service for Flink’s taskmanagers.

This will allow you to query Flink job state with a QueryableStateClient from Kubernetes cluster using job-${jobId}-taskmanager:9069 as an address, where jobId corresponds to the current job ID of your Deployment.

SSL/TLS Setup¶

Ververica Platform can be configured to use SSL/TLS for intra-cluster as well as client communication. Ververica Platform supports Flink SSL/TLS setup in auto-provisioned manner. To enable it, set security.ssl.enabled: true in deployment template’s annotation. This switches on SSL mutual authentication for Flink internal network communication and makes Flink REST API and Flink web user interface served via https. Additionally, this will enable SSL mutual authentication for clients of Flink REST API and Flink web user interface).

By default, SSL is disabled (same as explicit setting security.ssl.enabled: false).

Note

Enabling this option will set the required Flink SSL configuration parameters. Effectively this would ignore matching user settings in Flink Configuration (flink-conf.yaml).

Attention

Direct requests to Flink’s REST API will require a trusted client certificate when SSL mutual authentication is enabled. Access to the Flink web user interface (and REST API) is still possible through Ververica Platform’s Flink proxy.

SSL Provider¶

Currently Flink supports two different SSL providers.

  • security.ssl.provider: OPENSSL: (default) OpenSSL-based SSL engine using system libraries
  • security.ssl.provider: JDK: pure Java-based SSL engine

Beginning with Flink 1.9, we support the usage of OpenSSL as an SSL provider for our images. The OpenSSL provider has a significantly lower performance overhead in comparison to the JDK implementation. For Flink versions 1.10 and above, it is the default provider.

Note

For Flink 1.9, you need to ensure that you at least use version 1.9.2-stream2 before setting the provider to OPENSSL. Otherwise, the Flink job will fail.

Attention

Beware that not all SSL algorithms which you can set at security.ssl.algorithms might be supported by OpenSSL. You can always set the provider back to the native JDK implementation security.ssl.provider: JDK to allow other algorithms.

Implementation details¶

To provision Flink clusters with key stores and trust stores, which are required for transport security, Ververica Platform generates once

  • a self signed certificate: the public key is shared with Flink jobmanager instances to facilitate Flink REST API and web user interface SSL client auth;
  • a self signed signing certificate (CA) for signing SSL certificates for Flink jobmanager.

Both certificates (with private keys) are stored in a key store under the persisted directory of Ververica Platform.

Before starting a Deployment with the configuration enabled, Ververica Platform will generate

  • a self signed certificate, used to enable secure Flink internal connectivity;
  • a certificate signed by the signing certificate, to enable https on Flink jobmanager’s REST API and web UI.

Both certificates (with private keys) are saved in a Kubernetes secret, which later is mounted to each Deployment’s Flink nodes.

The implementation can be summarized in the following diagram:

https://d33wubrfki0l68.cloudfront.net/8625c81edd436e2c5f8772d64a7223d1f5fb913f/9c4da/_images/ssl_certificates.png

Flink Master Failover (“HA”)¶

Apache Flink requires a so-called HighAvailabilityService in order to be able to recover the internal state of the Flink Master node (including metadata about the latest checkpoint) on failures.

By default, Flink Master high availability is not enabled. For production installations it is highly recommended to configure Flink with such a service.

Kubernetes (built-in)¶

The Apache Flink distribution of Ververica Platform ships with a HighAvailabilityService that does not have any external service dependencies on top of Kubernetes.

The service is enabled via the following configuration:

flinkConfiguration:
  high-availability: vvp-kubernetes
  high-availability.storageDir: s3://vvp/flink-ha

If Universal Blob Storage is enabled, Flink’s high-availability.storageDir will be configured automatically. In this case, it is not necessary to provide any high-availability configuration in the Flink configuration manually.

Access Control¶

When enabled, your Flink application will require access to the API of your Kubernetes cluster. In particular two additional ConfigMap resources will be created:

  • job-$jobId-flink-ha-jobmanager: high availability metadata
  • job-$jobId-flink-ha-jobmanager-leader-election: leader election

We will automatically create these config maps and restrict access to them via Kubernetes RBAC. The following permissions are granted for Flink if enabled:

Component Permissions
Jobmanager
  • Read/write for config map job-$jobId-flink-ha-jobmanager
  • Read/write for config map job-$jobId-flink-ha-jobmanager-leader-election
  • Create config maps in namespace
Taskmanager
  • Read-only for config map job-$jobId-flink-ha-jobmanager

The above access restrictions are applied via Kubernetes RBAC resources that are created per Flink job. If enabled, a ServiceAccount will be assigned to each Flink pods that applies the respective authorization.

Limitations¶

  • Since this HighAvailabilityService requires access to your Kubernetes cluster, it will put additional load on your Kubernetes cluster. As a rule of thumb, we recommend this HighAvailabilityService for small to medium-sized installations of 10s of jobs with parallelism <= 100.
  • If your default service account in target Kubernetes namespace has additional configuration (such as image pull secrets) or other access control restrictions, this will not be reflected in the created ServiceAccount resources. Therefore, you have to manually provide any additional configuration on the Deployment level (see Flink Pod Templates).
  • If your cluster does not use Kubernetes RBAC, you can’t use this high availability service.

ZooKeeper¶

Apache Flink ships with a ZooKeeper-based HighAvailabilityService that is described in the official Flink documentation on high availability.

The following options are required to enable ZooKeeper-based high availability:

flinkConfiguration:
  high-availability: zookeeper
  high-availability.storageDir: s3://vvp/flink-ha
  high-availability.zookeeper.quorum: zk-node1:2181, zk-node2:2181, zk-node3:2181

Ververica Platform will automatically scope all state by setting the high-availability.cluster-id to the Deployment ID. Currently, you cannot override this behavior as it can lead to undefined side effects between jobs. Therefore, the above options are sufficient to configure Flink Master failover with Ververica Platform.

If Universal Blob Storage is enabled, Flink’s high-availability.storageDir will be configured automatically. In this case, you can skip the entry for high-availability.storageDir.

Next Previous

© Copyright 2020, Ververica GmbH.

Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.

Report an issue with this documentation page | Imprint

Other Versions v: v2.1
Tags
v2.10
v2.9
v2.8
v2.7
v2.6
v2.5
v2.4
v2.3
v2.2
v2.1
v2.0
v1.4
v1.3
v1.2
v1.1
v1.0
sql-eap