Deployment Configuration
Similar to regular Apache Flink® deployments, SQL deployments are configured via the Flink configuration. In addition to the configuration options of regular Flink deployments, there are several SQL-specific options to configure and fine-tune the execution of SQL queries.
SQL Options
Performance Tuning Options
Flink's SQL optimizer and runtime feature many optimizations to improve the performance of continuous SQL queries. Not all of these features are enabled by default and some have fine-tuning options.
Mini-Batched Execution Options
By default, the GROUP BY
aggregation operator processes arriving rows one-by-one.
For each row, the operator reads the respective intermediate aggregate from state, updates it, and writes it back to state.
Especially for workloads with skewed grouping key distributions, this can put significant pressure on the state backend resulting in poor throughput and possibly backpressure.
Mini-batched execution is a strategy to reduce the number of state accesses. When enabled, the aggregation operator collects a batch of rows, groups them by key, and performs one state access for each distinct key in the batch, i.e., not for every row. Depending on the key distribution, this can significantly reduce the number of state backend accesses and improve the throughput. However, mini-batched execution also increases the latency because rows are buffered before they are processed.
Mini-batched execution is disabled by default. When this feature is enabled, you can tweak the throughput-latency trade-off by configuring the maximum batch size and the maximum time that rows are buffered.
Option | Default | Type | Description |
---|---|---|---|
table.exec.mini-batch.enabled | false | Boolean | Switch to enable mini-batched execution. |
table.exec.mini-batch.allow-latency | “-1 ms” | String | The maximum time to buffer a row in a mini batch. Positive value > 0 mandatory if mini-batched execution is enabled. |
table.exec.mini-batch.size | -1 | Long | The maximum number of rows to buffer in a mini batch. Positive value > 0 mandatory if mini-batched execution is enabled. |
Mini-batched execution is only supported for non-windowed group aggregations. GROUP BY window aggregations do not benefit from enabling this feature.
Two-Phase Aggregation Options
By default, Flink executes GROUP BY
aggregations with a one-phase strategy.
All rows with the same key are shipped to the same parallel operator instance for processing.
This strategy can overload an operator instance if the distribution of grouping key values is skewed resulting in poor performance.
A common strategy of distributed systems to address the problem of skewed grouping keys is two-phase aggregation with a local pre-aggregation operator (in MapReduce terms known as Combiners) that ships partial aggregates to a final aggregation operator. Flink features a configuration switch to enable two-phase aggregations.
Two-phase aggregation requires that mini-batched execution is enabled.
The local pre-aggregation operators use the batch size and timeout values that are configured for the mini-batch feature.
Since two-phase aggregation adds a buffering pre-aggregation operator for each GROUP BY
operator, this feature also increases the time until the result of a query is updated, i.e., the query's result latency.
Option | Default | Type | Description |
---|---|---|---|
table.optimizer.agg-phase-strategy | “Auto” | String [“AUTO”, “ONE_PHASE”, TWO_PHASE”] | Execution strategy for non-windowed aggregations. |
Two-phase aggregation is only supported for non-windowed group aggregations. GROUP BY window aggregations do not benefit from enabling this feature.
DISTINCT Aggregate Options
There are a few aggregation functions that need to keep track of all distinct values, such as COUNT(DISTINCT x)
, SUM(DISTINCT x)
MIN(x)
, and MAX(x)
.
These functions do not benefit from the two-phase aggregation, because each local pre-aggregation operator needs to maintain all distinct values it observes.
Flink features an optimization which assigns the aggregation values x
into buckets to ensure that all identical aggregation values x
are processed by the same pre-aggregation operator.
Hence, the final aggregation operator receives each distinct value for each grouping key just once.
The number of buckets determines how fine-granular the aggregation values are distributed and only needs to be adjusted if a query is executed with a very high parallelism.
Option | Default | Type | Description |
---|---|---|---|
table.optimizer.distinct-agg.split.enabled | false | Boolean | Switch to execute distinct aggregates with splitting strategy. |
table.optimizer.distinct-agg.split.bucket-num | 1024 | Integer | Number of buckets for split execution of distinct aggregates. |
The DISTINCT aggregate optimization is only supported for non-windowed group aggregations. GROUP BY window aggregations do not benefit from enabling this feature.
Common Sub-Plan Options
Query execution plans are usually tree-shaped with table scan operators being the leafs and the result being produced by the root operator. When a query references the same table multiple times, it might happen that its execution plan has two or more identical sub-trees of operators, all producing exactly the same intermediate result.
By default, Flink's SQL optimizer rewrites execution plans such that identical sub-trees are only computed once and that their results are shared. You can disable this behavior for identical sub-trees or identical table scan operators.
Option | Default | Type | Description |
---|---|---|---|
table.optimizer.reuse-source-enabled | true | Boolean | Switch to enable re-usage of table source scans. |
table.optimizer.reuse-sub-plan-enabled | true | Boolean | Switch to enable re-usage of common sub-plans. |
Source Filter Push-Down Options
Filter push-down is a common optimization strategy for database systems. For data processors like Flink that do not own the data, this technique is even more important because it allows pushing a filter condition into the source connector to reduce the amount of data that is accessed (possibly read from disk) and is passed to Flink.
Source filter push-down is enabled in Flink by default. You can disable it with the following configuration option.
Option | Default | Type | Description |
---|---|---|---|
table.optimizer.source.predicate-pushdown-enabled | true | Boolean | Switch to enable predicate push-down into compatible table sources. |
Not all source connectors support filter push-down.
Join Order Optimization Options
Join order optimization is an important optimization for database systems because it has the potential to significantly reduce the amount of intermediate data and thereby reduce the execution time of a query by orders of magnitude. However, join order optimization requires precise cardinality estimates which can only be derived with good estimation models and detailed table and column statistics. Since precise statistics are hard to maintain for tables with continuously arriving data, Ververica Platform's built-in VVP catalog does not support table and column statistics yet.
Therefore, join order optimization is disabled by default. Instead, the optimizer produces execution plans that join the source tables in the order in which they are referenced in the query. We discourage enabling this feature, because the optimizer is likely to produce worse plans than what users can achieve with a bit of manual tuning (Queries page for details).
Option | Default | Type | Description |
---|---|---|---|
table.optimizer.join-reorder-enabled | false | Boolean | Switch to enable join-order optimization. |
State Management Options
Certain operations, such as non-windowed GROUP BY
aggregations or joins without temporal join conditions, need to maintain potentially large amounts of state (see Queries for details on the state requirements of individual operators).
Depending on the characteristics of the input data and the query, this state might be continuously growing and cannot be automatically pruned without putting the correctness of the query result at risk.
Flink features a configuration option to remove state that hasn't been accessed (read or updated) for a certain amount of time. When enabled, state TTL removes idle state and can thereby help to control the state size of a query.
Enabling state TTL can affect the correctness of query results. Depending on its input data, an operator might need to access any piece of its state at any point in time. A query will produce incorrect results, as soon as an operator needs to access state that has been removed before. It depends on the query and its input data whether it is save to enable state TTL or not.
Option | Default | Type | Description |
---|---|---|---|
table.exec.state.ttl | 0 s(disabled) | Duration | The minimum time to keep idle state. State won’t be removed before it was idle for the configured duration and be kept at most 1.5 times of the configured minimum duration. |
The state TTL configuration parameter only affects operators that are not able to automatically clean up their state.
Operators with temporal conditions, such as windowed GROUP BY
, interval joins, OVER
windows, and MATCH_RECOGNIZE
patterns will only remove state that is guaranteed to not be required in the future.
Table Options
Flink provides a few configuration options to fine-tune the interaction with source and sink tables.
Source Idleness Options
Flink's event-time processing depends on frequently updated watermarks that are emitted by table scan operators. Since watermarks are data-driven, a table scan operator can only update its watermark when it ingests rows. A source instance that does not receive any data cannot provide updated watermarks. Flink can declare a source instance as idle to exclude it from the watermark update mechanism such that it doesn't stall the event-time processing. Once the idle source instance receives data again, it is marked as active and is considered for watermark updates.
You can configure the duration after which a source instance with no received data is marked as idle.
Option | Default | Type | Description |
---|---|---|---|
table.exec.source.idle-timeout | “-1 ms” (disabled) | String | Time after which a source that doesn’t ingest rows is marked as idle. |
Connector Hint Options
Connector hint options allow to override connector properties of a table within a query. This can obviate the need to register a new table. See the Flink documentation for table options for details.
Option | Default | Type | Description |
---|---|---|---|
table.dynamic-table-options.enabled | true | Boolean | Switch to enable table OPTION hints. |
Connector hint options are enabled by default in Ververica Platform in contrast to Apache Flink®.
NOT NULL Enforcement Options
Flink supports the definition of NOT NULL
constraints on table columns.
When a query writes to a table, it might happen that a NOT NULL
constraint of the sink table is violated at runtime.
By default, Flink reports an error and terminates the query.
You can also configure Flink to silently drop all rows that would violate the NOT NULL
constraint.
Option | Default | Type | Description |
---|---|---|---|
table.exec.sink.not-null-enforcer | “Error” | String [“ERROR”, “DROP”] | Strategy to handle rows with NULL values would be written to columns with NOT NULL constraints. |
Parallelism Options
You can configure the parallelism of all parallelizable query operators using this parameter.
It overrides the general parallelism configuration option parallelism.default
.
By default this option is disabled (set to -1
).
Setting the parallelism of query operators with this configuration option will make VVP's auto-pilot feature effectless. Auto-pilot configures the parallelism of Flink jobs via the general parallelism configuration, which is overridden by this configuration option.
Option | Default | Type | Description |
---|---|---|---|
table.exec.resource.default-parallelism | -1 (disabled) | Integer | The parallelism of all parallelizable operators (aggregation, join, filter, etc.). |
Code Generation Options
Flink's SQL optimizer generates Java code to evaluate expressions and call user-defined function. For complex queries, the generated code can exceed Java's limit of 64 KB per method. Flink has a parameter to define an upper bound on the size of generated methods. Methods that would exceed the limit are split into multiple smaller methods.
Option | Default | Type | Description |
---|---|---|---|
table.generated-code.max-length | 64000 | Integer | Maximum size (in bytes) of code-generated function calls before they are split into sub-functions. |
Time Zone Options
Processing values of the SQL data type TIMESTAMP_WITH_LOCAL_TIME_ZONE
(see Data Types page) depends on the definition of a local time zone.
By default, Flink uses the default timezone of the JVM but you can also set the local timezone explicitly via the Flink configuration.
Option | Default | Type | Description |
---|---|---|---|
table.local-time-zone | The JVM default time zone | String (formatted as ZoneId) | The local time zone that is used when converting to or from values of TIMESTAMP_WITH_LOCAL_TIME_ZONE type. |
Common Apache Flink® Options
SQL deployments can be configured with all options of regular Flink deployments. The most common options are discussed in the deployments documentation.
For a complete list of all configuration options have a look at the official Flink documentation.