Deployments¶
Deployments are the core resource abstraction within Application Manager. A Deployment specifies the desired state of an application and its configuration. At the same time, Application Manager tracks and reports each Deployment’s status and derives other resources from it. Whenever the Deployment specification is modified, Application Manager will ensure that the latest Flink job reflects the new settings.
Deployments tie together a sequence of deployed Flink jobs, their accumulated state, an event log, a deployment target, policies to execute upgrades, and a template for creating Flink jobs.
In the current version of Application Manager, each Deployment will only ever have zero or one active Flink job. It launches a Flink cluster for each running Deployment.
Specification¶
A deployment spec
is the section that controls the behavior of a Deployment and it contains the following main parts:
- A desired state to control the state of a Deployment.
- Which strategy to apply when upgrading a running Flink job in order to meet a new specification.
- Which Flink jobs’ state to use when restarting or restoring Flink jobs.
- A Deployment Template that defines the necessary information needed to deploy a Flink job to a specific target.
- Limits on the allowed operations Application Manager will perform in various scenarios.
The following sections describe these parts in turn. At the end of this page you will find a full example of a Deployment.
When creating or modifying a Deployment, optional fields will be expanded with their default values. Therefore, every Deployment resource will always be fully specified when you request it from Application Manager.
Desired State¶
A Deployment resource has a desired state (specified in spec.state
). A deployment state can be one of the following:
- RUNNING - This indicates that the user would like to start a job as defined in the
spec.template
section. - CANCELED - This indicates that the user would like to terminate any currently running (possibly none) Flink jobs.
- SUSPENDED - This indicates that the user would like to terminate the currently running job by first taking a savepoint (see Restore Strategy), and only cancelling the job after a successful savepoint.
The difference between cancelling and suspending a deployment is that when cancelling, the state of the Flink job controlled by the Deployment is lost. In contrast, suspending will snapshot this state before cancelling.
Both suspending or cancelling a Deployment currently deallocate all resources associated with that deployment from the DeploymentTarget.
When the desired state does not match the current state, Application Manager will try to perform a state transition to achieve the desired state.
Note
The desired state will only be eventually achieved. The behavior of modifications happening concurrently to a state transition is undefined. Application Manager only guarantees that eventually the latest modification will be acted upon. Intermediate states may not be observed. This means that an ongoing state transition will not necessarily be immediately interrupted if the desired state changes while an operation is still ongoing.
Status¶
The effect of updating the Deployment’s state is observed by inspecting the Deployment’s status. The deployment status (status.state
) is one of the following:
- RUNNING: A Flink job was successfully started as specified in the
deployment.spec
and the Flink job has not been terminated. - CANCELLED: Any previously active Flink job was terminated via cancellation.
- SUSPENDED: The deployment has successfully created a savepoint and terminated the job.
- FINISHED: The Flink job was finite and finished execution successfully, e.g. a finite streaming or a batch job. Note: When reaching this status, you have to manually cancel and run the deployment to re-run the finite job.
- TRANSITIONING: The deployment is currently in transition to achieve the desired state.
- FAILED: A deployment transition has failed. At this point a manual intervention is necessary, for example cancelling the Deployment (setting the desired state
spec.state
to cancelled).
Note
A FAILED deployment status only indicates that the transition attempt has failed, and not necessarily that a Flink job has failed. For example, failing to suspend might lead to a Deployment resource with a desired state suspended (in spec.state
) and a status failed (in status.state
) while a corresponding Flink job may still be running.
Upgrades¶
Application Manager will automatically orchestrate upgrades of running jobs (if present) depending on the configured upgrade strategy. Currently, there are three supported upgrade strategies:
- STATELESS: Application Manager will terminate any currently running Flink job without taking a savepoint and then will start the new job.
- STATEFUL: Application Manager will first save the state of the currently running Flink job by performing a savepoint, then terminate the currently running job, and finally will start a new job by using the savepoint taken prior to termination. To use this strategy please read the section titled Restore Strategy.
- NONE: Application Manager will not perform an automatic upgrade of a running Flink job. If this strategy is selected, the user is expected to manually cancel or suspend the currently running job (by setting
spec.state
to cancelled), and start a new job manually (by settingspec.state
back to running).
When will an Upgrade Strategy be invoked? Application Manager will automatically start an upgrade in the following cases:
- A change in the metadata.annotations part of a Deployment.
- A change in any of the keys of the spec part of a Deployment.
As noted earlier, Application Manager will eventually achieve the desired state using the specified upgrade strategy. Executing an upgrade strategy happens in an fault-tolerant way, e.g. when performing a stateful upgrade and triggering a savepoint fails, Application Manager will retry until a savepoint succeeds. The Flink job will not be terminated before a savepoint succeeds.
Application Manager creates a physical Flink cluster for each Application Manager Job on Kubernetes. Currently, each upgrade involves the release of all containers belonging to a Job and bringing up new ones for the updated Job.
Restore Strategy¶
Application Manager can instruct Flink to start executing a job from a specific savepoint or checkpoint when transitioning to the RUNNING state (configuration key spec.restoreStrategy
).
The possible values supported:
- LATEST_STATE - Use the latest successful checkpoint (requires a High Availability setup) or savepoint known to Application Manager.
- LATEST_SAVEPOINT - Use the latest successful savepoint known to Application Manager. It may have been previously triggered by a user request or by Application Manager (for example during a suspension or a stateful upgrade).
- NONE - Do not start from any checkpoint or savepoint.
Note
The stateful upgrade strategy as described in Upgrades only works in conjunction with spec.restoreStrategy
set to LATEST_SAVEPOINT or LATEST_STATE.
If you instead set restoreStrategy to NONE, you might run into an an unexpected situation and start from an empty state after your job is upgraded.
Note
spec.restoreStrategy
is introduced since Application Manager version 1.3 and replaces deprecated configuration key spec.startFromSavepoint
.
The deprecated key is not mandatory anymore and still supported by the service, but would be removed in later versions.
The service will automatically transition from the old configuration to the new one.
LATEST and NONE values of startFromSavepoint
correspond to LATEST_SAVEPOINT and NONE of restoreStrategy
accordingly.
Requirements for Latest State restore strategy¶
The LATEST_STATE restore strategy would work only with applications running on Flink 1.7 (and greater). Additionally, you need to setup Flink High Availability (High Availability) and configure checkpoints to be retained on cancellation in the Flink job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION );
This lets your Flink application resume from the latest checkpointed state available in ZooKeeper instead of solely relying on savepoints.
Without proper setup, the effect would be the same as just setting the LATEST_SAVEPOINT restore strategy.
Note
Deployment upgrades still respect the configured upgrade strategy and are independent of the configured restore strategy. A stateful upgrade will still trigger a savepoint.
Allowing non-restored state¶
When restoring from a savepoint that is not fully compatible with a job, because the savepoint contains state for a Flink task that is not present in the job, you can paramterize restore strategies LATEST_STATE and LATEST_SAVEPOINT with the allowNonRestoredState flag:
kind: Deployment
spec:
restoreStrategy:
kind: LATEST_SAVEPOINT
allowNonRestoredState: true
Deployment Templates¶
The spec.template
section of a Deployment specifies what artifact should be executed and how to execute it, including how to configure Flink, what parallelism to use, and other important settings.
You can think of all settings in the template being directly applied to your Job instances, whereas the Deployment spec section defines how to control these jobs (for instance, how to do upgrades, as described in this page).
Please refer to the Deployment Templates page for more details on how to configure the template.
Limits¶
During state transitions of a Deployment, Application Manager creates Jobs and might ask a running Flink job to create a savepoint.
These operations might fail due to either transient reasons, like a network issue or a misconfiguration, that can be only verified when a Flink job is trying to start.
For these cases we can limit the number of attempts that Application Manager performs before transitioning to a failed status:
Key | Description | Default |
---|---|---|
maxSavepointCreationAttempts |
Maximum attempted savepoints before failing. | 4 |
maxJobCreationAttempts |
Maximum job creation attempts before failing. | 4 |
Limits and Flink restart strategies¶
This section is about the interplay of Flink’s restart strategies and Application Manager’s maxJobCreationAttempts
. Both of these configuration settings control how failures are handled.
maxJobCreationAttempts
configure how many attempts are made by Application Manager to transition to a running job and are reset after a job transitioned to running.
After a job is running (in Flink and Application Manager) the Flink restart strategy covers all failures. Such failures include exceptions from the user code, machine failures or network disruptions. If the restart strategy is exhausted, the job enters a terminal Flink state such as FAILED. Application Manager detects the terminal state and will recover the job (by bringing up a new job cluster).
Deployment Targets¶
The spec.deploymentTargetId
key specifies which Deployment Target to use for executing your jobs.
Full Example¶
The following snippet is a complete example of a Deployment, including all optional keys and a Deployment Template.
kind: Deployment
apiVersion: v1
metadata:
name: TopSpeedWindowing Example
labels:
env: testing
spec:
state: running
deploymentTargetId: 57b4c290-73ad-11e7-8cf7-a6006ad3dba0
restoreStrategy:
kind: latest_savepoint
upgradeStrategy:
kind: stateless
maxSavepointCreationAttempts: 4
maxJobCreationAttempts: 5
template:
spec:
artifact:
kind: jar
flinkVersion: 1.6
jarUri: http://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.11/1.6.0/flink-examples-streaming_2.11-1.6.0-TopSpeedWindowing.jar
mainArgs: --windowSize 10 --windowUnit minutes
entryClass: org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
parallelism: 1
numberOfTaskManagers: 2
resources:
jobManager:
cpu: 1
memory: 1g
taskManager:
cpu: 1.0
memory: 2g
flinkConfiguration:
taskmanager.numberOfTaskSlots: 1
state.savepoints.dir: s3://flink/savepoints
logging:
log4jLoggers:
"": INFO
org.apache.flink.streaming.examples: DEBUG