Configure Apache Flink
In this page we will cover how to configure common Flink features with Application Manager.
The Deployment Template section of your Deployment provides a
flinkConfiguration attribute that allows you to specify the Flink configuration for created jobs. The following examples should be nested under
All stateful life-cycle operations (such as suspending a deployment or executing a stateful upgrade) require savepoints to be configured. Please provide an entry in the
flinkConfiguration map with the key
The provided directory needs to be accessible by all nodes of your cluster. For instance, you can use a custom volume mount (Volume Mounts) or S3 bucket (State in S3).
Please consult the official Flink documentation on savepoints for more details.
In order to store Flink application state such as checkpoints or savepoints in AWS S3, you have to provide credentials as part of the
The default Flink images of Application Manager ship with
PrestoS3FileSystem. Please refer to the Flink documentation on AWS deployment for more details.
Application Manager supports Flink SSL/TLS setup in auto provisioned manner. For details, please refer to Flink SSL/TLS.
High availability (HA) of Flink applications requires a ZooKeeper installation and a persistent storage backend as described in the official Flink documentation on High Availability.
For convenience, we repeat the required options here as part of the
high-availability.zookeeper.quorum: zk-node1:2181, zk-node2:2181, zk-node3:2181
Application Manager will automatically scope all state by setting the
high-availability.cluster-id. Currently, you cannot overwrite this behavior as it can lead to undefined side effects between jobs. Therefore, the above options are sufficient to configure HA with Application Manager.
For Flink 1.6 jobs we use the Job ID as the cluster ID. Starting from Flink 1.7, we use the Deployment ID as the cluster ID.
Application Manager 1.3 introduces a new restore strategy called LATEST_STATE that allows you restore your Flink 1.7 (and greater) applications from the latest available state, both checkpoints and savepoints. Note that earlier versions of Application Manager and Flink don’t support this feature and fall back to the same behavior as the LATEST_SAVEPOINT restore strategy.
High Availability (HA): This feature currently requires a working HA setup as decribed above.
Checkpoint Retention: You need to manually configure your Flink application to retain checkpoints on cancellation as follows.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Deployment.spec.restoreStrategy.kind: LATEST_STATE in your Deployment in order to enable the restore strategy. This lets your Flink application resume from the latest checkpointed state available in ZooKeeper instead of solely relying on savepoints.
If no checkpointed state is available, the Flink application will fall back to the latest available savepoint as with the LATEST_SAVEPOINT restore strategy.
Deployment upgrades still respect the configured upgrade strategy and are independent of the configured restore strategy. A stateful upgrade will still trigger a savepoint.