Deployments provide declarative management of Apache Flink® applications over time. The desired state of a Deployment specifies whether the Deployment is supposed to be running or not. Depending on this desired state, the Deployment will go through various state transitions and create derived Job resources.
A Deployment’s desired state can be one of the following:
- This indicates that the user would like to start a Flink job as defined in the
- This indicates that the user would like to terminate any currently running Flink jobs.
- This indicates that the user would like to gracefully terminate the currently running job and take a snapshot of its state.
When the desired state does not match the current state, Ververica Platform will try to perform a state transition to achieve the desired state.
The desired state will only be eventually achieved. The behavior of modifications happening concurrently to a state transition is undefined. Ververica Platform only guarantees that eventually the latest modification will be acted upon. Intermediate states may not be observed. This means that an ongoing state transition will not necessarily be immediately interrupted if the desired state changes while an operation is still ongoing.
The following sections summarize the actions taken by Ververica Platform when transitioning to the various desired states.
When transitioning to the RUNNING state, a new Flink job will be deployed and a new Job resource will be created.
A transition to the RUNNING state is triggered in the following cases:
- The desired state
spec.stateof a Deployment is changed from CANCELLED or SUSPENDED to RUNNING.
- An upgrade is invoked after modification of the Deployment template.
In both cases, the configured restore strategy will determine which state snapshot will be used to restore the Flink job state from.
Please note that due to the separation of upgrade and restore strategy, the STATEFUL upgrade strategy only works in conjunction with restore strategy set to LATEST_STATE or LATEST_SAVEPOINT.
After a successful transition to the RUNNING state, the state of the Flink job will be RUNNING and the status
status.state of the Ververica Platform Job resource will be STARTED.
spec.maxJobCreationAttempts configures how many attempts are made to transition to a running Deployment. This configuration is independent of the configured restart strategy for Flink which control job restarts after the job has started.
spec.jobFailureExpirationTime has been set, only the restarts within the expiration time count towards the
When transitioning to the CANCELLED state, running Flink jobs will be terminated via Flink’s cancellation API.
After a successful transition to the CANCELLED state, the terminal state of the Flink job will be CANCELLED and the status
status.state of Ververica Platform Job resource will be TERMINATED.
When transitioning a RUNNING Deployment to the SUSPENDED state, the Flink job will be terminated via Flink’s graceful stop job API.
Flink’s stop API guarantees that exactly-once sinks can fully persist their output to external storage systems prior to job termination and that no additional snapshots are triggered after the final termination Savepoint.
The terminal state of the Flink job as well as the Ververica Platform Job resource
status.state will be FINISHED.
The above points also apply to the STATEFUL upgrade strategy which first suspends a job before re-deploying the upgraded job.
The Deployment status (
status.state) is one of the following:
- RUNNING: A Flink job was successfully started as specified in the
deployment.specand the Flink job has not been terminated.
- CANCELLED: Any previously active Flink job was terminated via cancellation.
- SUSPENDED: The Deployment has successfully created a Savepoint and terminated the job.
- FINISHED: The Flink job was finite and finished execution successfully, e.g. a finite streaming or a batch job.
- TRANSITIONING: The Deployment is currently in transition to achieve the desired state.
- FAILED: A Deployment transition has failed. At this point a manual intervention is necessary, for example cancelling the Deployment (setting the desired state
A FAILED Deployment status only indicates that the transition attempt has failed, and not necessarily that a Flink job has failed. For example, failing to suspend might lead to a Deployment resource with a desired state suspended (in
spec.state) and a status failed (in
status.state) while a corresponding Flink job may still be running.
When a Deployment reaches status RUNNING, additional information about the Deployment is provided under
status.running. This section of the Deployment API is considered experimental and no guarantees are made about future availability.
kind: Deployment status: state: running running: # ID of the Job resource which was created for the transition to the RUNNING # state. You can find the respective Job resource via this ID. jobId: string # Time that the Deployment transitioned to the RUNNING state. transitionTime: string # A list of DeploymentConditions (see below) that provide more details about # the state of the Deployment. conditions: - # Unique type of the condition. type: string # Status of the condition (True, False, Unknown) status: string # Short message explaining the status. message: string # Reason for the transition to status. reason: string # Time of last transition of the status. lastTransitionTime: string # Time of last update to this condition. This time is different from # lastTransitionTime if the condition was updated without a change to # the status attribute. lastUpdateTime: string
There is additional information available for STARTED jobs. You can find the Job that was started for the running Deployment using the jobId available in status.running.jobId in the Deployment.
The following conditions are currently available in status.state: RUNNING.
We list each available condition with a short explanation of its True status. Please be aware that a condition with status False does not imply that the opposite of the condition is true.
- The Flink job corresponding to the running Deployment was last observed in an unknown status.
- The Flink job corresponding to the running Deployment was last observed to have restarted 3 or more times within the last 10 minutes.
- The Flink job corresponding to the running Deployment was last observed to have restarted 3 or more times within the last 60 minutes. Note that JobUnstable currently implies JobFailing.
The user interface uses these conditions to highlight Deployments that require further attention by the user.
Please note that these APIs are not meant to replace active monitoring of your Flink applications and should only be used as an additional status report in the scope of Ververica Platform.
There is a known issue in Flink that causes a ClusterUnreachable deployment condition, when the checkpoint storage is unavailable and the job is recovering. If you are facing this condition, we recommend checking the availability of the checkpoint storage. To avoid this issue, we recommend configuring a restart strategy in Flink that causes the Flink job to fail if failures are too frequent. Using such a restart strategy will eventually fail the job, which will be reported in Application Manager.
A ClusterUnreachable condition will also result from any error that causes VVP to lose its connection to the JVM, for example a crashed JVM due to an OOM condition; a hung or timed-out connection; or any network condition that makes the JVM unreachable.
Deployments running in session mode have to be terminated before the corresponding SessionCluster can be stopped. In order to avoid orphaning Flink jobs that belong to a Deployment, the Deployment can only transition to the cancelled state after the corresponding Flink job was successfully cancelled.
You may want to skip cancellation in order to stop the corresponding SessionCluster without waiting for all Deployments to be terminated. In this case, you can force cancellation of a Job by providing the following Deployment annotation:
kind: Deployment metadata: annotations: deployment.sessionmode.force-cancellation: $jobId
$jobId refers to the ID of the Job resource that should be force cancelled. The configuration will only be respected for the configured job.
In session mode, Flink jobs are submitted to a running SessionCluster. The submission has a default timeout of 5 minutes (300 seconds).
You can configure the timeout via the following annotation:
kind: Deployment metadata: annotations: deployment.sessionmode.job-submission-timeout-seconds: 300
The Ververica Platform uses Kubernetes Pods to run Apache Flink® JobManager and TaskManager processes, which are “owned” objects of Ververica Platform Deployments.
When upgrading and cancelling an Application Mode Deployment these objects will need to be removed, and replaced in the case of upgrades, to support the new desired state of the Deployment.
Kubernetes allows specifying how these owned objects should be deleted which, among various use cases, gives users tight control over the requested resource overhead in the cluster.
- Delete underlying Pods while new Pods are being created.
- This may cause resource request spikes as the cluster does not release resources until the Pods are fully removed.
- Delete the current Pods before creating new ones.
- This may lead to slower upgrades and cancellations, as resources need to be released before either process can proceed.
- Do not delete the current Pods and allow them to be manually cleaned up.
- This can be helpful in certain debugging situations where you’d like to inspect Pods but remove the Ververica Platform resources, but it is not recommended in normal use.
For more information on the policies, see the Kubernetes Garbage Collector docs.
|Process||Default Deletion Policy|
The policy for Application Mode JobManagers and TaskManagers can be set via the
deployment.taskmanagers.deletion-policy annotations, respectively. For example, the below configures both JobManagers and TaskManagers
to be deleted in the Foreground.
kind: Deployment metadata: annotations: deployment.jobmanagers.deletion-policy: Foreground deployment.taskmanagers.deletion-policy: Foreground