Apache Flink® Deployments¶
Deployments are the core resource abstraction within Ververica Platform. A Deployment specifies the desired state of an application and its configuration. At the same time, Ververica Platform tracks and reports each Deployment’s status and derives other resources from it. Whenever the Deployment specification is modified, Ververica Platform will ensure the running application will eventually reflect this change.
Deployments tie together a sequence of deployed Flink Jobs, their accumulated state, an event log, a deployment target, policies to execute upgrades, and a template for creating Flink jobs.
A Deployment will always be backed by zero or one active Flink Jobs. It launches a separate Flink cluster for each Flink job.
A Deployment resources consists of a
metadata section and a
metadata contains auxiliary information to manage your Deployments. The
spec describes your application and defines its behaviour and configuration. It consists of the following sections:
- A desired state to control the state of a Deployment.
- Which strategy to apply when upgrading a running Flink job in order to meet a new specification.
- A Deployment Target binding the Deployment to a Kubernetes namespace
- A Deployment Template that defines the necessary information needed to deploy a Flink job to a specific target.
- Limits on the allowed operations Ververica Platform will perform in various scenarios.
The following sections describe these parts in turn. At the end of this page you will find a full example of a Deployment.
When creating or modifying a Deployment, optional fields will be expanded with their default values. Therefore, every Deployment resource will always be fully specified when you request it from Ververica Platform.
A Deployment resource has a desired state (specified in
spec.state). A Deployment’s desired state can be one of the following:
- RUNNING - This indicates that the user would like to start a job as defined in the
- CANCELED - This indicates that the user would like to terminate any currently running Flink jobs.
- SUSPENDED - This indicates that the user would like to gracefully terminate the currently running job and take a snapshot of its state.
Both suspending or cancelling a Deployment deallocate all resources associated with that Deployment from the DeploymentTarget.
When the desired state does not match the current state, Ververica Platform will try to perform a state transition to achieve the desired state.
The desired state will only be eventually achieved. The behavior of modifications happening concurrently to a state transition is undefined. Ververica Platform only guarantees that eventually the latest modification will be acted upon. Intermediate states may not be observed. This means that an ongoing state transition will not necessarily be immediately interrupted if the desired state changes while an operation is still ongoing.
Transitions to RUNNING¶
When transitioning to the RUNNING state, a new Flink job will be deployed. This will result in a new VVP Job resource to be created and new cluster resources to be allocated.
A transition to the RUNNING state is triggered in the following cases:
- The desired state (
spec.state) of a Deployment is changed from CANCELLED or SUSPENDED to RUNNING.
- An upgrade strategy is invoked (typically after modification of the Deployment template).
In both cases, the configured restore strategy will determine which state snapshot will be used to restore the Flink job state from. Please note that due to the separation of upgrade and restore strategy, the STATEFUL upgrade strategy only works in conjunction with restore strategy set to LATEST_STATE or LATEST_SAVEPOINT.
After a successful transition to the RUNNING state, the state of the Flink job will be RUNNING and the state of the VVP Job resource (
status.state) will be STARTED.
Transitions to CANCELLED¶
When transitioning to the CANCELLED state, the Flink job with be terminated as follows:
Running Flink jobs will be terminated via Flink’s cancellation API.
The cluster resources will be cleaned up when either of the following conditions hold:
- Cancellation has completed succesfully within the deadline (3 minutes).
- Cancellation fails multiple times (5 times).
- Cancellation takes longer than the deadline (3 minutes).
The terminal state of the Flink job will be CANCELLED and the terminal state of VVP Job resource (
status.state) will be TERMINATED.
Transitions to SUSPENDED¶
When transitioning a RUNNING Deployment to the SUSPENDED state, the Flink job will be terminated as follows, depending on the Flink version:
- Apache Flink® 1.9 series and later
- Running Flink jobs will be terminated via Flink’s graceful stop job API.
- Flink’s stop API guarantees that exactly-once sinks can fully persist their output to external storage systems prior to job termination and that no additional snapshots are triggered after the final termination Savepoint.
- The terminal state of the Flink job as well as the VVP Job resource (
status.state) will be FINISHED.
- Apache Flink® 1.8 series
- Running Flink jobs will be terminated in two steps: first a Savepoint will be triggered and then the job will be cancelled if the triggered Savepoint completes successfully.
- In constract to Flink 1.9 and later, this does not guarantee persistence to external systems prior to job termination nor that the termination Savepoint is the last state snapshot.
- The terminal state of the Flink job will be CANCELLED and the terminal state of the VVP Job resource (
status.state) will be TERMINATED.
The above points also apply to the STATEFUL upgrade strategy which first suspends a job before re-deploying the upgraded job.
spec.template section of a Deployment specifies what artifact should be executed and how to execute it, including how to configure Flink, what parallelism to use, and other important settings.
You can think of all settings in the template being directly applied to your job instances, whereas the overall Deployment specification section defines how to control these jobs (for instance, how to do upgrades).
Please refer to the Deployment Templates page for more details.
During state transitions of a Deployment, Ververica Platform creates Jobs and might ask a running Flink job to create a savepoint.
These operations might fail due to either transient reasons, like a network issue or a misconfiguration, that can be only verified when a Flink job is trying to start.
For these cases we can limit the number of attempts that Ververica Platform performs before transitioning to a “FAILED” status:
||Maximum attempted Savepoints before failing.||4|
||Maximum Job creation attempts before failing.||4|
Both of these limits are reset to zero after a successful Savepoint/Job creation.
Limits and Apache Flink®’s RestartStrategies¶
This section is about the interplay of Flink’s RestartStrategies and Ververica Platform’s
maxJobCreationAttempts. Both of these configuration settings control how failures are handled.
maxJobCreationAttempts configure how many attempts are made by Ververica Platform to transition to a running job and are reset after a job transitioned to “RUNNING”:.
After a job is running (in Flink and Ververica Platform) the Flink RestartStrategy covers all failures. Such failures include exceptions from the user code, machine failures or network disruptions. If the restart strategy is exhausted, the job enters a terminal Flink state such as FAILED. Ververica Platform detects the terminal state and will recover the job according to it to RestoreStrategy.
When Ververica Platform discovers that the running Flink job deviates from the Flink job specified in
spec.template, it will perform an upgrade of the Flink job to reconciliate the situation. The behaviour of Ververica Platform is defined by an UpgradeStrategy (
spec.upgradeStrategy) and a RestoreStragey (
spec.restoreStrategy). Please refer to the Lifecycle Management for details.
spec.deploymentTargetId key specifies which Deployment Target the Flink jobs of this Deployment should be deployed into.
The referenced Deployment Target must be in the same Namespace.
When a Deployment resource is created, default values for attributes in the spec section will be filled in as specified via the Deployment Defaults API.
The following snippet is a complete example of a Deployment, including all optional keys and a Deployment Template.
kind: Deployment apiVersion: v1 metadata: name: TopSpeedWindowing Example labels: env: testing spec: state: running deploymentTargetId: 57b4c290-73ad-11e7-8cf7-a6006ad3dba0 restoreStrategy: kind: latest_savepoint upgradeStrategy: kind: stateless maxSavepointCreationAttempts: 4 maxJobCreationAttempts: 5 template: spec: artifact: kind: jar flinkVersion: 1.11 jarUri: http://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.11/1.11.2/flink-examples-streaming_2.11-1.11.2-TopSpeedWindowing.jar mainArgs: --windowSize 10 --windowUnit minutes entryClass: org.apache.flink.streaming.examples.windowing.TopSpeedWindowing parallelism: 1 numberOfTaskManagers: 2 resources: jobManager: cpu: 1 memory: 1g taskManager: cpu: 1.0 memory: 2g flinkConfiguration: taskmanager.numberOfTaskSlots: 1 state.savepoints.dir: s3://flink/savepoints logging: log4jLoggers: "": INFO org.apache.flink.streaming.examples: DEBUG
metadata and the
spec section are managed by the user. The
status section of a Deployment is managed by Ververica Platform and reflects the current status of the Deployment.
The Deployment status (
status.state) is one of the following:
- RUNNING: A Flink job was successfully started as specified in the
deployment.specand the Flink job has not been terminated.
- CANCELLED: Any previously active Flink job was terminated via cancellation.
- SUSPENDED: The Deployment has successfully created a Savepoint and terminated the job.
- FINISHED: The Flink job was finite and finished execution successfully, e.g. a finite streaming or a batch job.
- TRANSITIONING: The Deployment is currently in transition to achieve the desired state.
- FAILED: A Deployment transition has failed. At this point a manual intervention is necessary, for example cancelling the Deployment (setting the desired state
A FAILED Deployment status only indicates that the transition attempt has failed, and not necessarily that a Flink job has failed. For example, failing to suspend might lead to a Deployment resource with a desired state suspended (in
spec.state) and a status failed (in
status.state) while a corresponding Flink job may still be running.
When a Deployment reaches status RUNNING, additional information about the Deployment is provided under
status.running. This section of the Deployment API is considered experimental and no guarnatees are made about future availability.
kind: Deployment status: state: running running: # ID of the Job resource which was created for the transition to the RUNNING # state. You can find the respective Job resource via this ID. jobId: string # Time that the Deployment transitioned to the RUNNING state. transitionTime: string # A list of DeploymentConditions (see below) that provide more details about # the state of the Deployment. conditions: - # Unique type of the condition. type: string # Status of the condition (True, False, Unknown) status: string # Short message explaining the status. message: string # Reason for the transition to status. reason: string # Time of last transition of the status. lastTransitionTime: string # Time of last update to this condition. This time is different from # lastTransitionTime if the condition was updated without a change to # the status attribute. lastUpdateTime: string
There is additional information available for STARTED jobs. You can find the Job that was started for the running Deployment using the jobId available in status.running.jobId in the Deployment.
The following conditions are currently available in status.state: RUNNING.
We list each available condition with a short explanation of its True status. Please be aware that a condition with status False does not imply that the opposite of the condition is true.
- The Flink job corresponding to the running Deployment was last observed in an unknown status.
- The Flink job corresponding to the running Deployment was last observed to have restarted 3 or more times within the last 10 minutes.
- The Flink job corresponding to the running Deployment was last observed to have restarted 3 or more times within the last 60 minutes. Note that JobUnstable currently implies JobFailing.
The user interface uses these conditions to highlight Deployments that require further attention by the user.
Please note that these APIs are not meant to replace active monitoring of your Flink applications and should only be used as an additional status report in the scope of Ververica Platform.
Ververica Platform use will be limited by the number of CPU cores used by running, production Flink Deployments controlled by Ververica Platform on Kubernetes.
- CPU Core
- A CPU core (or short “core”) is the central billing unit in Ververica Platform. A CPU core refers to “cpu units”, in Kubernetes. One CPU is for example equivalent to one AWS vCPU, 1 GCP core, 1 Azure vCore (or similar concepts for other cloud providers) or 1 Hyperthread on a bare-metal processor with Hyperthreading (See also the meaning of CPU in the Kubernetes documentation).
Prior to launching a Flink Deployment, Ververica Platform will perform a quota check. This means that a Deployment is not rejected for quota overuse when it is created or updated, but only when its desired state is changed to “RUNNING”. Ververica Platform will never stop a running Deployment because of a quota limitation. If a Deployment can’t be launched due to the quota, there will be a message in the event log.