Deployment Templates¶
Deployment Templates are specified as part of Deployment resources in spec.template
. Templates are translated into Job resources when you set the desired state of a Deployment to running.
You can think of the relation between the overall Deployment specification and its template as follows:
- The template specifies which Apache Flink® job is executed and how to execute it, including its configuration.
- The Deployment specification defines how job instances are managed over time, for instance how to perform upgrades or which Savepoint to restore from.
The template has two parts, metadata
and spec
.
- The metadata section only accepts optional annotations. Annotations are key/value pairs used to provide additional information or configuration options.
- The spec specifies which artifact to execute and how to execute it.
kind: Deployment
spec:
template:
metadata:
...
spec:
...
At the end of this page, you will find a full example that creates a Deployment for a running Apache Flink® job. The following sections will break down each part of the template in more detail.
The following sections are not valid on their own and should be pasted in under the spec section of the template Deployment.spec.template.spec
.
Artifacts¶
The artifact section of the template specifies which artifact to execute. Currently, there is a single kind of supported artifact, jar.
JAR Artifacts¶
JAR artifacts must package regular Flink programs that are executed via the main method of their entry class.
In addition to your main JAR file you can provide additional dependencies via additionalDependencies
.
All files referenced under jarUri
and additionalDependencies
will become part of the user code classpath of the Flink Java processes.
artifact:
kind: jar
jarUri: https://artifacts/flink-job.jar
entryClass: com.ververica.myjob.EntryPoint # Optional, if no Manifest entry
mainArgs: --arg1 1 --arg2 2 # Optional
additionalDependencies:
- https://artifacts/flink-connector.jar
- https://artifacts/additional-library.jar
Both your main JAR under jarUri
and all files under additionalDependencies
are referenced by a URI.
Ververica Platform supports http(s) as well as multiple blob storage services.
Please refer to Artifact Management for more details on artifact storage and retrieval.
spec.template.spec.artifact.mainArgs
are the positional parameters fed into the main method of the entry point class. These parameters are split by whitespace. Single parameters containing whitespace can be represented by wrapping them in single or double quotes. Double quoted values are escaped by means of backslashes: --json "{\"key\": \"value\"}"
results in two parameters: --json
and {"key": "value"}
.
Apache Flink® Docker Images¶
By default, containers created for a Deployment will use the configured default Flink Docker images. You can override these on a per Deployment basis as part of the artifact.
artifact:
kind: jar
jarUri: https://artifacts/flink-job.jar
flinkVersion: 1.11
flinkImageRegistry: registry.ververica.com
flinkImageRepository: v2.3/flink
flinkImageTag: 1.11.3-stream1-scala_2.11
Each flink* attribute is optional. If not provided, it will fall back to the configured default of your Ververica Platform installation.
You can specify the Flink image by digest if you prefix the flinkImageTag
attribute with @
, for instance flinkImageTag: @sha256:...
.
Apache Flink® Version¶
The flinkVersion
attribute specifies which Flink version the artifact should be executed with. If you don’t specify an explicit version, Ververica Platform will default to the latest supported version.
In addition, the provided flinkVersion
will be used to pick the default Flink image tag.
Note
Note that the specified flinkVersion
and the Flink version of the deployed Docker image must match. When upgrading the flinkVersion
of a Deployment make sure to also update the flinkImageTag
accordingly.
Apache Flink® Configuration (flink-conf.yaml)¶
The Flink configuration flink-conf.yaml passed to a created Flink cluster is generated from the flinkConfiguration
map. Each map entry will be added to the Flink configuration as is.
flinkConfiguration:
key1: value1
key2: value2
Please see Apache Flink® Configuration for details and instructions on how to enable common Apache Flink features via Ververica Platform.
Flink configurations that are required for Ververica Platform to function correctly (such as certain ports) cannot be overwritten and will be added automatically.
Parallelism, Number of Taskmanagers, and Slots¶
You can specify the parallelism of your jobs via the parallelism
key. By default, there will be as many taskmanager instances created as the specified parallelism. You can overwrite this behavior via the numberOfTaskManagers
key.
parallelism: 1
numberOfTaskManagers: 1 # Optional, defaults to parallelism
Each taskmanager will have Flink’s default setting for number of task slots (currently a single slot per taskmanager). Combining the configuration of parallelism and numberOfTaskManagers with the taskmanager.numberOfTaskSlots
option of Flink gives you full flexibility in how to execute your jobs.
In the following snippet, we specify two taskmanager instances with 4 slots each and a parallelism of 8 for our jobs.
parallelism: 8
numberOfTaskManagers: 2
flinkConfiguration:
taskmanager.numberOfTaskSlots: 4
For more details on Flink task slots, consult the official Flink documentation.
Compute Resources¶
You can configure requested compute resources for CPU and memory via the resources
:
resources:
jobmanager:
cpu: 1
memory: 1G
taskmanager:
cpu: 1
memory: 2G
The keys jobmanager
and taskmanager
configure the respective Flink components. By default, the above values are used as defaults. You can overwrite each value selectively.
Note that resources are configured per component instance, e.g. if you use 10 taskmanager instances, Ververica Platform will in total try to acquire 10 times the configured taskmanager resources.
CPU¶
You can specify CPU as a decimal number, e.g. 1, 1.0, or 0.5, which always needs to be greater or equal than 0.1. We highly encourage you not to set the CPU value below 0.5 because it might significantly affect the deployment startup times.
Memory¶
You can specify memory as a decimal number with an optional memory unit as indicated by the suffix (ignoring case):
- Gigabytes
G
, e.g. 1G or 1.5G - Megabytes
M
, e.g. 1000M or 1500M
If no unit is specified, the provided number is interpreted as bytes. Each memory unit is interpreted as a power of ten, e.g. 1K equals 1000 bytes.
Deployments require at least 1g of memory for the TaskManager and 500m of memory for the JobManager.
Note
The CPU resources you configure for your Deployments are counted against your licensed resource quota (CPU Quota).
JVM Heap Size
Deployments running Flink 1.10 or later will forward the configuration in resources.taskmanager.memory to Flink’s taskmanager.memory.process.size
configuration, relying on Flink’s automatic configuration of all memory.
Note that the memory unit is translated to base 2 before being forward to Flink. In addition, the managed memory fraction taskmanager.memory.managed.fraction
is reduced to 0.2 for low memory configurations with less than 1000m of total process memory.
You can find more details about Flink’s memory configuration in the Flink documentation.
For JobManager instances and earlier versions of Flink, the following table lists the default values of the minimum memory cutoff and the cutoff ratio for JobManager and TaskManager instances, respectively.
Minimum cutoff | Cutoff ratio | |
---|---|---|
Jobmanager | 400 M | 25 % |
jobmanager.heap-cutoff-min: 400 |
jobmanager.heap-cutoff-ratio: 0.25 |
|
Taskmanager | 600 M | 25 % |
containerized.heap-cutoff-min: 600 |
containerized.heap-cutoff-ratio: 0.25 |
The flinkConfiguration keys listed in the table allow you to adjust the cut offs manually.
As an example, take a memory request of 2G for a taskmanager. This means that 600 M will be cut off from the configured heap (as 25% of 2G is less than 600 M). Application Manager will respect the container memory cut-offs specified in flinkConfiguration
similar to Apache Flink’s behavior.
Logging¶
You can customize the logging behaviour of Flink Jobs on a per Deployment basis.
Loggers¶
You can configure log levels for individual loggers via the log4jLoggers attribute:
logging:
log4jLoggers:
"": INFO # Root log level
com.company: DEBUG # Log level of com.company
Note that the key to configure the root log level is the empty string ""
.
Logging Profile¶
The logging profile determines the exact Apache Log4j 2 configuration to be used. Logging profiles are pre-configured by the Ververica Platform administrator and a profile named default
is always available.
logging:
loggingProfile: default
Please consult the Logging & Metrics documentation for more details on how to configure Logging profiles. In general, you should expect the Ververica Platform administrator to set up the default logging profile as required by your environment.
Log4j2 Template (Advanced)¶
The logging template gives you full control over the Apache Log4j 2 configuration of your job in case that the logging profile does not provide enough flexibility.
logging:
loggingProfile: null
log4j2ConfigurationTemplate: |
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<Configuration xmlns="http://logging.apache.org/log4j/2.0/config" strict="true">
...
</Configuration>
If you specify both a logging profile and custom template, the template will have higher precedence. Typically, the Deployment Defaults will provide a default value for the loggingProfile to use.
The configured template follows the same rules as the system level profile configuration. Please consult the Logging & Metrics documentation for more details about how to configure a template.
Apache Flink® Pod Templates¶
It is possible to configure the Kubernetes Pods created for a Deployment. Please see Apache Flink® Pod Templates for more details.
Full Example¶
kind: Deployment
apiVersion: v1
metadata:
name: TopSpeedWindowing Example
labels:
env: testing
spec:
state: RUNNING
deploymentTargetId: 57b4c290-73ad-11e7-8cf7-a6006ad3dba0
restoreStrategy:
kind: LATEST_STATE
upgradeStrategy:
kind: STATEFUL
maxSavepointCreationAttempts: 4
maxJobCreationAttempts: 5
template:
metadata:
annotations:
flink.queryable-state.enabled: 'false'
flink.security.ssl.enabled: 'false'
spec:
artifact:
kind: jar
flinkVersion: 1.11
jarUri: http://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.11/1.11.3/flink-examples-streaming_2.11-1.11.3-TopSpeedWindowing.jar
additionalDependencies:
- s3://mybucket/some_additional_library.jar
- s3://mybucket/some_additional_resources
mainArgs: --windowSize 10 --windowUnit minutes
entryClass: org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
flinkConfiguration:
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 10s
high-availability: vvp-kubernetes
state.backend: filesystem
parallelism: 2
numberOfTaskManagers: 2
resources:
jobManager:
cpu: 1
memory: 1g
taskManager:
cpu: 1.0
memory: 2g
logging:
loggingProfile: default
log4jLoggers:
"": INFO
org.apache.flink.streaming.examples: DEBUG