Deployment Lifecycle
Deployments provide declarative management of Apache Flink® applications over time. The desired state of a Deployment specifies whether the Deployment is supposed to be running or not. Depending on this desired state, the Deployment will go through various state transitions and create derived Job resources.
Desired State
A Deployment's desired state can be one of the following:
RUNNING
This indicates that the user would like to start a Flink job as defined in the spec.template
section.
CANCELED This indicates that the user would like to terminate any currently running Flink jobs.
SUSPENDED This indicates that the user would like to gracefully terminate the currently running job and take a snapshot of its state.
When the desired state does not match the current state, Ververica Platform will try to perform a state transition to achieve the desired state.
The desired state will only be eventually achieved. The behavior of modifications happening concurrently to a state transition is undefined. Ververica Platform only guarantees that eventually the latest modification will be acted upon. Intermediate states may not be observed. This means that an ongoing state transition will not necessarily be immediately interrupted if the desired state changes while an operation is still ongoing.
State Transitions
The following sections summarize the actions taken by Ververica Platform when transitioning to the various desired states.
Transitions to RUNNING
When transitioning to the RUNNING
state, a new Flink job will be deployed and a new Job resource will be created.
A transition to the RUNNING
state is triggered in the following cases:
- The desired state
spec.state
of a Deployment is changed fromCANCELLED
orSUSPENDED
toRUNNING
. - An upgrade is invoked after modification of the Deployment template.
In both cases, the configured restore strategy will determine which state snapshot will be used to restore the Flink job state.
Please note that due to the separation of upgrade and restore strategy, the STATEFUL
upgrade strategy only works in conjunction with restore strategy set to LATEST_STATE
or LATEST_SAVEPOINT
.
After a successful transition to the RUNNING
state, the state of the Flink job will be RUNNING
and the status status.state
of the Ververica Platform Job resource will be STARTED
.
Note that spec.maxJobCreationAttempts
configures how many attempts are made to transition to a running Deployment. This configuration is independent of the configured restart strategy for Flink which control job restarts after the job has started.
If spec.jobFailureExpirationTime
has been set, only the restarts within the expiration time count towards the spec.maxJobCreationAttempts
.
Transitions to CANCELLED
When transitioning to the CANCELLED
state, running Flink jobs will be terminated via Flink's cancellation API.
After a successful transition to the CANCELLED
state, the terminal state of the Flink job will be CANCELLED
and the status status.state
of Ververica Platform Job resource will be TERMINATED
.
Transitions to SUSPENDED
When transitioning a RUNNING
Deployment to the SUSPENDED
state, the Flink job will be terminated via Flink's graceful stop job API.
Flink's stop API guarantees that exactly-once sinks can fully persist their output to external storage systems prior to job termination and that no additional snapshots are triggered after the final termination Savepoint.
The terminal state of the Flink job as well as the Ververica Platform Job resource status.state
will be FINISHED
.
The above points also apply to the STATEFUL
upgrade strategy which first suspends a job before re-deploying the upgraded job.
Deployment Status
The Deployment status (status.state
) is one of the following:
- RUNNING: A Flink job was successfully started as specified in the
deployment.spec
and the Flink job has not been terminated. - CANCELLED: Any previously active Flink job was terminated via cancellation.
- SUSPENDED: The Deployment has successfully created a Savepoint and terminated the job.
- FINISHED: The Flink job was finite and finished execution successfully, e.g. a finite streaming or a batch job.
- TRANSITIONING: The Deployment is currently in transition to achieve the desired state.
- FAILED: A Deployment transition has failed. At this point a manual intervention is necessary, for example cancelling the Deployment (setting the desired state
spec.state
to cancelled).
A FAILED Deployment status only indicates that the transition attempt has failed, and not necessarily that a Flink job has failed. For example, failing to suspend might lead to a Deployment resource with a desired state suspended (in spec.state
) and a status failed (in status.state
) while a corresponding Flink job may still be running.
RUNNING
When a Deployment reaches status RUNNING
, additional information about the Deployment is provided under status.running
. This section of the Deployment API is considered experimental and no guarantees are made about future availability.
kind: Deployment
status:
state: running
running:
# ID of the Job resource which was created for the transition to the RUNNING
# state. You can find the respective Job resource via this ID.
jobId: string
# Time that the Deployment transitioned to the RUNNING state.
transitionTime: string
# A list of DeploymentConditions (see below) that provide more details about
# the state of the Deployment.
conditions:
- # Unique type of the condition.
type: string
# Status of the condition (True, False, Unknown)
status: string
# Short message explaining the status.
message: string
# Reason for the transition to status.
reason: string
# Time of last transition of the status.
lastTransitionTime: string
# Time of last update to this condition. This time is different from
# lastTransitionTime if the condition was updated without a change to
# the status attribute.
lastUpdateTime: string
There is additional information available for STARTED
jobs. You can find the Job that was started for the running Deployment using the jobId
available in status.running.jobId
in the Deployment.
Deployment Conditions
The following conditions are currently available in status.state: RUNNING
.
We list each available condition with a short explanation of its True
status. Please be aware that a condition with status False
does not imply that the opposite of the condition is true.
-
ClusterUnreachable: The Flink job corresponding to the running Deployment was last observed in an unknown status.
-
JobFailing: The Flink job corresponding to the running Deployment was last observed to have restarted 3 or more times within the last 10 minutes.
-
JobUnstable: The Flink job corresponding to the running Deployment was last observed to have restarted 3 or more times within the last 60 minutes. Note that
JobUnstable
currently impliesJobFailing
.
The user interface uses these conditions to highlight Deployments that require further attention by the user.
Please note that these APIs are not meant to replace active monitoring of your Flink applications and should only be used as an additional status report in the scope of Ververica Platform.
There is a known issue in Flink that causes a ClusterUnreachable
deployment condition, when the checkpoint storage is unavailable and the job is recovering.
If you are facing this condition, we recommend checking the availability of the checkpoint storage.
To avoid this issue, we recommend configuring a restart strategy in
Flink that causes the Flink job to fail if failures are too
frequent. Using
such a restart strategy will eventually fail the job, which will be
reported in Application Manager.
A ClusterUnreachable
condition will also result from any error that
causes Ververica Platform to lose its connection to the JVM, for example a crashed
JVM due to an OOM condition; a hung or timed-out connection; or any
network condition that makes the JVM unreachable.
Session Mode
Force Cancellation
Deployments running in session mode have to be terminated before the corresponding SessionCluster can be stopped. In order to avoid orphaning Flink jobs that belong to a Deployment, the Deployment can only transition to the cancelled state after the corresponding Flink job was successfully cancelled.
You may want to skip cancellation in order to stop the corresponding SessionCluster without waiting for all Deployments to be terminated. In this case, you can force cancellation of a Job by providing the following Deployment annotation:
kind: Deployment
metadata:
annotations:
deployment.sessionmode.force-cancellation: $jobId
$jobId
refers to the ID of the Job resource that should be force cancelled. The configuration will only be respected for the configured job.
Job Submission Timeout
In session mode, Flink jobs are submitted to a running SessionCluster. The submission has a default timeout of 5 minutes (300 seconds).
You can configure the timeout via the following annotation:
kind: Deployment
metadata:
annotations:
deployment.sessionmode.job-submission-timeout-seconds: 300
Kubernetes Garbage Collection
The Ververica Platform uses Kubernetes Pods to run Apache Flink® JobManager and TaskManager processes, which are "owned" objects of Ververica Platform Deployments.
When upgrading and cancelling an Application Mode Deployment these objects will need to be removed, and replaced in the case of upgrades, to support the new desired state of the Deployment.
Kubernetes allows specifying how these owned objects should be deleted which, among various use cases, gives users tight control over the requested resource overhead in the cluster.
Available Deletion Policies
-
Background
-
Delete underlying Pods while new Pods are being created.
-
This may cause resource request spikes as the cluster does not release resources until the Pods are fully removed.
-
-
Foreground
-
Delete the current Pods before creating new ones.
-
This may lead to slower upgrades and cancellations, as resources need to be released before either process can proceed.
-
-
Orphan
-
Do not delete the current Pods and allow them to be manually cleaned up.
-
This can be helpful in certain debugging situations where you'd like to inspect Pods but remove the Ververica Platform resources, but it is not recommended in normal use.
-
For more information on the policies, see the Kubernetes Garbage Collector docs.
Default Deletion Policies
Process | Default Deletion Policy |
---|---|
JobManagers | Foreground |
TaskManagers | Background |
Customize Deletion Policies
The policy for Application Mode JobManagers and TaskManagers can be set via the deployment.jobmanagers.deletion-policy
and
deployment.taskmanagers.deletion-policy
annotations, respectively. For example, the below configures both JobManagers and TaskManagers
to be deleted in the Foreground.
kind: Deployment
metadata:
annotations:
deployment.jobmanagers.deletion-policy: Foreground
deployment.taskmanagers.deletion-policy: Foreground