Deployments

A resource central to Application Manager is the Deployment. A Deployment specifies the desired state of an application and its configuration. At the same time, Application Manager tracks and reports each Deployment’s status and derives other resources from it. Whenever the Deployment specification is modified, Application Manager will ensure that the latest Flink job reflects the new settings.

Deployments tie together a sequence of deployed Flink jobs, their accumulated state, an event log, a deployment target, policies to execute upgrades, and a template for creating Flink jobs.

In the current version of Application Manager, each Deployment will only ever have zero or one active Flink job. It launches a Flink cluster for each running Deployment.

Specification

A deployment spec is the section that controls the behavior of a Deployment and it contains the following main parts:

The following sections describe these parts in turn. At the end of this page you will find a full example of a Deployment.

When creating or modifying a Deployment, optional fields will be expanded with their default values. Therefore, every Deployment resource will always be fully specified when you request it from Application Manager.

Desired State

A Deployment resource has a desired state (specified in spec.state). A deployment state can be one of the following:

  • RUNNING - This indicates that the user would like to start a job as defined in the spec.template section.
  • CANCELED - This indicates that the user would like to terminate any currently running (possibly none) Flink jobs.
  • SUSPENDED - This indicates that the user would like to terminate the currently running job by first taking a savepoint (see Restore Strategy), and only cancelling the job after a successful savepoint.

The difference between cancelling and suspending a deployment is that when cancelling, the state of the Flink job controlled by the Deployment is lost. In contrast, suspending will snapshot this state before cancelling.

Both suspending or cancelling a Deployment currently deallocate all resources associated with that deployment from the DeploymentTarget.

When the desired state does not match the current state, Application Manager will try to perform a state transition to achieve the desired state.

Note

The desired state will only be eventually achieved. The behavior of modifications happening concurrently to a state transition is undefined. Application Manager only guarantees that eventually the latest modification will be acted upon. Intermediate states may not be observed. This means that an ongoing state transition will not necessarily be immediately interrupted if the desired state changes while an operation is still ongoing.

Status

The effect of updating the Deployment’s state is observed by inspecting the Deployment’s status. The deployment status (status.state) is one of the following:

  • RUNNING: A Flink job was successfully started as specified in the deployment.spec and the Flink job has not been terminated.
  • CANCELLED: Any previously active Flink job was terminated via cancellation.
  • SUSPENDED: The deployment has successfully created a savepoint and terminated the job.
  • FINISHED: The Flink job was finite and finished execution successfully, e.g. a finite streaming or a batch job. Note: When reaching this status, you have to manually cancel and run the deployment to re-run the finite job.
  • TRANSITIONING: The deployment is currently in transition to achieve the desired state.
  • FAILED: A deployment transition has failed. At this point a manual intervention is necessary, for example cancelling the Deployment (setting the desired state spec.state to cancelled).

Note

A FAILED deployment status only indicates that the transition attempt has failed, and not necessarily that a Flink job has failed. For example, failing to suspend might lead to a Deployment resource with a desired state suspended (in spec.state) and a status failed (in status.state) while a corresponding Flink job may still be running.

Upgrades

Application Manager will automatically orchestrate upgrades of running jobs (if present) depending on the configured upgrade strategy. Currently, there are three supported upgrade strategies:

  • STATELESS: Application Manager will terminate any currently running Flink job without taking a savepoint and then will start the new job.
  • STATEFUL: Application Manager will first save the state of the currently running Flink job by performing a savepoint, then terminate the currently running job, and finally will start a new job by using the savepoint taken prior to termination. To use this strategy please read the section titled Restore Strategy.
  • NONE: Application Manager will not perform an automatic upgrade of a running Flink job. If this strategy is selected, the user is expected to manually cancel or suspend the currently running job (by setting spec.state to cancelled), and start a new job manually (by setting spec.state back to running).

When will an Upgrade Strategy be invoked? Application Manager will automatically start an upgrade in the following cases:

  1. A change in the metadata.annotations part of a Deployment.
  2. A change in any of the keys of the spec part of a Deployment.

As noted earlier, Application Manager will eventually achieve the desired state using the specified upgrade strategy. Executing an upgrade strategy happens in an fault-tolerant way, e.g. when performing a stateful upgrade and triggering a savepoint fails, Application Manager will retry until a savepoint succeeds. The Flink job will not be terminated before a savepoint succeeds.

Application Manager creates a physical Flink cluster for each Application Manager Job on Kubernetes. Currently, each upgrade involves the release of all containers belonging to a Job and bringing up new ones for the updated Job.

Restore Strategy

Application Manager can instruct Flink to start executing a job from a specific savepoint or checkpoint when transitioning to the RUNNING state (configuration key spec.restoreStrategy).

The possible values supported:

  • LATEST_STATE - Use the latest successful checkpoint (requires a High Availability setup) or savepoint known to Application Manager.
  • LATEST_SAVEPOINT - Use the latest successful savepoint known to Application Manager. It may have been previously triggered by a user request or by Application Manager (for example during a suspension or a stateful upgrade).
  • NONE - Do not start from any checkpoint or savepoint.

Note

The stateful upgrade strategy as described in Upgrades only works in conjunction with spec.restoreStrategy set to LATEST_SAVEPOINT or LATEST_STATE. If you instead set restoreStrategy to NONE, you might run into an an unexpected situation and start from an empty state after your job is upgraded.

Note

spec.restoreStrategy is introduced since Application Manager version 1.3 and replaces deprecated configuration key spec.startFromSavepoint. The deprecated key is not mandatory anymore and still supported by the service, but would be removed in later versions.

The service will automatically transition from the old configuration to the new one. LATEST and NONE values of startFromSavepoint correspond to LATEST_SAVEPOINT and NONE of restoreStrategy accordingly.

Allowing non-restored state

When restoring from a savepoint that is not fully compatible with a job, because the savepoint contains state for a Flink task that is not present in the job, you can paramterize restore strategies LATEST_STATE and LATEST_SAVEPOINT with the allowNonRestoredState flag:

kind: Deployment
spec:
  restoreStrategy:
    kind: LATEST_SAVEPOINT
    allowNonRestoredState: true

Deployment Templates

The spec.template section of a Deployment specifies what artifact should be executed and how to execute it, including how to configure Flink, what parallelism to use, and other important settings.

You can think of all settings in the template being directly applied to your Job instances, whereas the Deployment spec section defines how to control these jobs (for instance, how to do upgrades, as described in this page).

Please refer to the Deployment Templates page for more details on how to configure the template.

Limits

During state transitions of a Deployment, Application Manager creates Jobs and might ask a running Flink job to create a savepoint.

These operations might fail due to either transient reasons, like a network issue or a misconfiguration, that can be only verified when a Flink job is trying to start.

For these cases we can limit the number of attempts that Application Manager performs before transitioning to a failed status:

Key Description Default
maxSavepointCreationAttempts Maximum attempted savepoints before failing. 4
maxJobCreationAttempts Maximum job creation attempts before failing. 4

Deployment Targets

The spec.deploymentTargetId key specifies which Deployment Target to use for executing your jobs.

Full Example

The following snippet is a complete example of a Deployment, including all optional keys and a Deployment Template.

kind: Deployment
apiVersion: v1
metadata:
  name: TopSpeedWindowing Example
  labels:
    env: testing
spec:
  state: running
  deploymentTargetId: 57b4c290-73ad-11e7-8cf7-a6006ad3dba0
  restoreStrategy:
    kind: latest_savepoint
  upgradeStrategy:
    kind: stateless
  maxSavepointCreationAttempts: 4
  maxJobCreationAttempts: 5
  template:
    spec:
      artifact:
        kind: jar
        flinkVersion: 1.6
        jarUri: http://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.11/1.6.0/flink-examples-streaming_2.11-1.6.0-TopSpeedWindowing.jar
        mainArgs: --windowSize 10 --windowUnit minutes
        entryClass: org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
      parallelism: 1
      numberOfTaskManagers: 2
      resources:
        jobManager:
          cpu: 1
          memory: 1g
        taskManager:
          cpu: 1.0
          memory: 2g
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: 1
        state.savepoints.dir: s3://flink/savepoints
      logging:
        log4jLoggers:
          "": INFO
          org.apache.flink.streaming.examples: DEBUG