A resource central to Application Manager is the Deployment. A Deployment specifies the desired state of an application and its configuration. At the same time, Application Manager tracks and reports each Deployment’s status and derives other resources from it. Whenever the Deployment specification is modified, Application Manager will ensure that the latest Flink job reflects the new settings.
Deployments tie together a sequence of deployed Flink jobs, their accumulated state, an event log, a deployment target, policies to execute upgrades, and a template for creating Flink jobs.
In the current version of Application Manager, each Deployment will only ever have zero or one active Flink job. It launches a Flink cluster for each running Deployment.
spec is the section that controls the behaviour of a Deployment and it contains the following main parts:
- A desired state to control the state of a Deployment.
- Which strategy to apply when upgrading a running Flink job in order to meet a new specification.
- The savepoint start behavior of created Flink jobs.
- A Deployment Template that defines the necessary information needed to deploy a Flink job to a specific target.
- Limits on the allowed operations Application Manager will perform in various scenarios.
The following sections describe these parts in turn. At the end of this page you will find a full example of a Deployment.
When creating or modifying a Deployment, optional fields will be expanded with their default values. Therefore, every Deployment resource will always be fully specified when you request it from Application Manager.
A Deployment resource has a desired state (specified in
spec.state). A deployment state can be one of the following:
- RUNNING - This indicates that the user would like to start a job as defined in the
- CANCELED - This indicates that the user would like to terminate any currently running (possibly none) Flink jobs.
- SUSPENDED - This indicates that the user would like to terminate the currently running job by first taking a savepoint (see Starting from a Savepoint), and only cancelling the job after a successful savepoint.
The difference between cancelling and suspending a deployment is that when cancelling, the state of the Flink job controlled by the Deployment is lost. In contrast, suspending will snapshot this state before cancelling.
Both suspending or cancelling a Deployment currently deallocate all resources associated with that deployment from the DeploymentTarget.
When the desired state does not match the current state, Application Manager will try to perform a state transition to achieve the desired state.
The desired state will only be eventually achieved. The behaviour of modifications happening concurrently to a state transition is undefined. Application Manager only guarantees that eventually the latest modification will be acted upon. Intermediate states may not be observed. This means that an ongoing state transition will not necessarily be immediately interrupted if the desired state changes while an operation is still ongoing.
The effect of updating the Deployment’s state is observed by inspecting the Deployment’s status. The deployment status (
status.state) is one of the following:
- RUNNING: A Flink job was successfully started as specified in the
deployment.specand the Flink job has not been terminated.
- CANCELLED: Any previously active Flink job was terminated via cancellation.
- SUSPENDED: The deployment has successfully created a savepoint and terminated the job.
- FINISHED: The Flink job was finite and finished execution successfully, e.g. a finite streaming or a batch job. Note: When reaching this status, you have to manually cancel and run the deployment to re-run the finite job.
- TRANSITIONING: The deployment is currently in transition to achieve the desired state.
- FAILED: A deployment transition has failed. At this point a manual intervention is necessary, for example cancelling the Deployment (setting the desired state
A FAILED deployment status only indicates that the transition attempt has failed, and not necessarily that a Flink job has failed. For example, failing to suspend might lead to a Deployment resource with a desired state suspended (in
spec.state) and a status failed (in
status.state) while a corresponding Flink job may still be running.
Application Manager will automatically orchestrate upgrades of running jobs (if present) depending on the configured upgrade strategy. Currently, there are three supported upgrade strategies:
- Stateless: Application Manager will terminate any currently running Flink job without taking a savepoint and then will start the new job.
- Stateful: Application Manager will first save the state of the currently running Flink job by performing a savepoint, then terminate the currently running job, and finally will start a new job by using the savepoint taken prior to termination. To use this strategy please read the section titled Starting from a Savepoint.
- None: Application Manager will not perform an automatic upgrade of a running Flink job. If this strategy is selected, the user is expected to manually cancel or suspend the currently running job (by setting
spec.stateto cancelled), and start a new job manually (by setting
spec.stateback to running).
When will an Upgrade Strategy be invoked? Application Manager will automatically start an upgrade in the following cases:
- A change in the metadata.annotations part of a Deployment.
- A change in any of the keys of the spec part of a Deployment.
As noted earlier, Application Manager will eventually achieve the desired state using the specified upgrade strategy. Executing an upgrade strategy happens in an fault-tolerant way, e.g. when performing a stateful upgrade and triggering a savepoint fails, Application Manager will retry until a savepoint succeeds. The Flink job will not be terminated before a savepoint succeeds.
Application Manager creates a physical Flink cluster for each Application Manager Job on Kubernetes. Currently, each upgrade involves the release of all containers belonging to a Job and bringing up new ones for the updated Job.
Starting from a Savepoint¶
Application Manager can instruct Flink to start executing a job from a specific savepoint when transitioning to the RUNNING state (configuration key
The possible values supported:
- LATEST - Use the latest successful savepoint known to Application Manager. It may have been previously triggered by a user request or by Application Manager (for example during a suspension or a stateful upgrade).
- NONE - Do not start from any savepoint.
The stateful upgrade strategy as described in Upgrades only works in conjunction with
spec.startFromSavepoint set to LATEST. If you instead set startFromSavepoint to NONE, you might run into an an unexpected situation and start from an empty state after your job is upgraded.
spec.template section of a Deployment specifies what artifact should be executed and how to execute it, including how to configure Flink, what parallelism to use, and other important settings.
You can think of all settings in the template being directly applied to your Job instances, whereas the Deployment spec section defines how to control these jobs (for instance, how to do upgrades, as described in this page).
Please refer to the Deployment Templates page for more details on how to configure the template.
During state transitions of a Deployment, Application Manager creates Jobs and might ask a running Flink job to create a savepoint.
These operations might fail due to either transient reasons, like a network issue or a misconfiguration, that can be only verified when a Flink job is trying to start.
For these cases we can limit the number of attempts that Application Manager performs before transitioning to a failed status:
||Maximum attempted savepoints before failing.||4|
||Maximum job creation attempts before failing.||4|
Limits and Flink restart strategies¶
This section is about the interplay of Flink’s restart strategies and Application Manager’s
maxJobCreationAttempts. Both of these configuration settings control how failures are handled.
maxJobCreationAttempts configure how many attempts are made by Application Manager to transition to a running job and are reset after a job transitioned to running.
After a job is running (in Flink and Application Manager) the Flink restart strategy covers all failures. Such failures include exceptions from the user code, machine failures or network disruptions. If the restart strategy is exhausted, the job enters a terminal Flink state such as FAILED. Application Manager detects the terminal state and will recover the job (by bringing up a new job cluster).
spec.deploymentTargetId key specifies which Deployment Target to use for executing your jobs.
The following snippet is a complete example of a Deployment, including all optional keys and a Deployment Template.
kind: Deployment apiVersion: v1 metadata: name: TopSpeedWindowing Example labels: env: testing spec: state: running deploymentTargetId: 57b4c290-73ad-11e7-8cf7-a6006ad3dba0 startFromSavepoint: kind: latest upgradeStrategy: kind: stateless maxSavepointCreationAttempts: 4 maxJobCreationAttempts: 5 template: spec: artifact: kind: jar flinkVersion: 1.6 jarUri: http://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.11/1.6.0/flink-examples-streaming_2.11-1.6.0-TopSpeedWindowing.jar mainArgs: --windowSize 10 --windowUnit minutes entryClass: org.apache.flink.streaming.examples.windowing.TopSpeedWindowing parallelism: 1 numberOfTaskManagers: 2 resources: jobManager: cpu: 0.5 memory: 500m taskManager: cpu: 1.0 memory: 2g flinkConfiguration: taskmanager.numberOfTaskSlots: 1 state.savepoints.dir: s3://flink/savepoints logging: log4jLoggers: "": INFO org.apache.flink.streaming.examples: DEBUG