Deployment Configuration

Similar to regular Apache Flink® deployments, SQL deployements are configured via the Flink configuration. In addition to the configuration options of regular Flink deployments, there are several SQL-specific options to configure and fine-tune the execution of SQL queries.

SQL Options

Performance Tuning Options

Flink’s SQL optimizer and runtime feature many optimizations to improve the performance of continuous SQL queries. Not all of these features are enabled by default and some have fine-tuning options.

Mini-Batched Execution Options

By default, the GROUP BY aggregation operator processes arriving rows one-by-one. For each row, the operator reads the respective intermediate aggregate from state, updates it, and writes it back to state. Especially for workloads with skewed grouping key distributions, this can put significant pressure on the state backend resulting in poor throughput and possibly backpressure.

Mini-batched execution is a strategy to reduce the number of state accesses. When enabled, the aggregation operator collects a batch of rows, groups them by key, and performs one state access for each distinct key in the batch, i.e., not for every row. Depending on the key distribution, this can significantly reduce the number of state backend accesses and improve the throughput. However, mini-batched execution also increases the latency because rows are buffered before they are processed.

Mini-batched execution is disabled by default. When this feature is enabled, you can tweak the throughput-latency trade-off by configuring the maximum batch size and the maximum time that rows are buffered.

Option Default Type Description
table.exec.mini-batch.enabled false Boolean Switch to enable mini-batched execution.
table.exec.mini-batch.allow-latency “-1 ms” String
The maximum time to buffer a row in a mini batch.
Positive value > 0 mandatory if mini-batched execution is enabled.
table.exec.mini-batch.size -1 Long
The maximum number of rows to buffer in a mini batch.
Positive value > 0 mandatory if mini-batched execution is enabled.

Note

Mini-batched execution is only supported for non-windowed group aggregations. GROUP BY window aggregations do not benefit from enabling this feature.

Two-Phase Aggregation Options

By default, Flink executes GROUP BY aggregations with a one-phase strategy. All rows with the same key are shipped to the same parallel operator instance for processing. This strategy can overload an operator instance if the distribution of grouping key values is skewed resulting in poor performance.

A common strategy of distributed systems to address the problem of skewed grouping keys is two-phase aggregation with a local pre-aggregation operator (in MapReduce terms known as Combiners) that ships partial aggregates to a final aggregation operator. Flink features a configuration switch to enable two-phase aggregations.

Note

Two-phase aggregation requires that mini-batched execution is enabled. The local pre-aggregation operators use the batch size and timeout values that are configured for the mini-batch feature. Since two-phase aggregation adds a buffering pre-aggregation operator for each GROUP BY operator, this feature also increases the time until the result of a query is updated, i.e., the query’s result latency.

Option Default Type Description
table.optimizer.agg-phase-strategy “Auto”
String
[“AUTO”, “ONE_PHASE”, TWO_PHASE”]
Execution strategy for non-windowed aggregations.

Note

Two-phase aggregation is only supported for non-windowed group aggregations. GROUP BY window aggregations do not benefit from enabling this feature.

DISTINCT Aggregate Options

There are a few aggregation functions that need to keep track of all distinct values, such as COUNT(DISTINCT x), SUM(DISTINCT x) MIN(x), and MAX(x). These functions do not benefit from the two-phase aggregation, because each local pre-aggregation operator needs to maintain all distinct values it observes.

Flink features an optimization which assigns the aggregation values x into buckets to ensure that all identical aggregation values x are processed by the same pre-aggregation operator. Hence, the final aggregation operator receives each distinct value for each grouping key just once. The number of buckets determines how fine-granular the aggregation values are distributed and only needs to be adjusted if a query is executed with a very high parallelism.

Option Default Type Description
table.optimizer.distinct-agg.split.enabled false Boolean Switch to execute distinct aggregates with splitting strategy.
table.optimizer.distinct-agg.split.bucket-num 1024 Integer Number of buckets for splitted execution of distinct aggregates.

Note

The DISTINCT aggregate optimization is only supported for non-windowed group aggregations. GROUP BY window aggregations do not benefit from enabling this feature.

Common Sub-Plan Options

Query execution plans are usually tree-shaped with table scan operators being the leafs and the result being produced by the root operator. When a query references the same table multiple times, it might happen that its execution plan has two or more identical sub-trees of operators, all producing exactly the same intermediate result.

By default, Flink’s SQL optimizer rewrites execution plans such that identical sub-trees are only computed once and that their results are shared. You can disable this behavior for identical sub-trees or identical table scan operators.

Option Default Type Description
table.optimizer.reuse-source-enabled true Boolean Switch to enable re-usage of table source scans.
table.optimizer.reuse-sub-plan-enabled true Boolean Switch to enable re-usage of common sub-plans.

Source Filter Push-Down Options

Filter push-down is a common optimization strategy for database systems. For data processors like Flink that do not own the data, this technique is even more important because it allows pushing a filter condition into the source connector to reduce the amount of data that is accessed (possibly read from disk) and is passed to Flink.

Source filter push-down is enabled in Flink by default. You can disable it with the following configuration option.

Option Default Type Description
table.optimizer.source.predicate-pushdown-enabled true Boolean Switch to enable predicate push-down into compatible table sources.

Note

Not all source connectors support filter push-down.

Join Order Optimization Options

Join order optimization is an important optimization for database systems because it has the potential to significantly reduce the amount of intermediate data and thereby reduce the execution time of a query by orders of magnitude. However, join order optimization requires precise cardinality estimates which can only be derived with good estimation models and detailed table and column statistics. Since precise statistics are hard to maintain for tables with continuously arriving data, VVP’s built-in catalog does not support table and column statistics yet.

Therefore, join order optimization is disabled by default. Instead, the optimizer produces execution plans that join the source tables in the order in which they are referenced in the query. We discourage enabling this feature, because the optimizer is likely to produce worse plans than what users can achieve with a bit of manual tuning (see Queries page for details).

Option Default Type Description
table.optimizer.join-reorder-enabled false Boolean Switch to enable join-order optimization.

State Management Options

Certain operations, such as non-windowed GROUP BY aggregations or joins without temporal join conditions, need to maintain potentially large amounts of state (see Queries for details on the state requirements of individual operators). Depending on the characteristics of the input data and the query, this state might be continuously growing and cannot be automatically pruned without putting the correctness of the query result at risk.

Flink features a configuration option to remove state that hasn’t been accessed (read or updated) for a certain amount of time. When enabled, state TTL removes idle state and can thereby help to control the state size of a query.

Warning

Enabling state TTL can affect the correctness of query results. Depending on its input data, an operator might need to access any piece of its state at any point in time. A query will produce incorrect results, as soon as an operator needs to access state that has been removed before. It depends on the query and its input data whether it is save to enable state TTL or not.

Option Default Type Description
table.exec.state.ttl
0 s
(disabled)
Duration
The minimum time to keep idle state.
State won’t be removed before it was idle for the configured duration
and be kept at most 1.5 times of the configured minimum duration.

Note

The state TTL configuration parameter only affects operators that are not able to automatically clean up their state. Operators with temporal conditions, such as windowed GROUP BY, interval joins, OVER windows, and MATCH_RECOGNIZE patterns will only remove state that is guaranteed to not be required in the future.

Table Options

Flink provides a few configuration options to fine-tune the interaction with source and sink tables.

Source Idleness Options

Flink’s event-time processing depends on frequently updated watermarks that are emitted by table scan operators. Since watermarks are data-driven, a table scan operator can only update its watermark when it ingests rows. A source instance that does not receive any data cannot provide updated watermarks. Flink can declare a source instance as idle to exclude it from the watermark update mechanism such that it doesn’t stall the event-time processing. Once the idle source instance receives data again, it is marked as active and is considered for watermark updates.

You can configure the duration after which a source instance with no received data is marked as idle.

Option Default Type Description
table.exec.source.idle-timeout
“-1 ms”
(disabled)
String Time after which a source that doesn’t ingest rows is marked as idle.

Connector Hints Options

Connector hints allow to override connector properties of a table within a query. This can obviate the need to register a new table. See the Flink’s documentation for table options for details.

Option Default Type Description
table.dynamic-table-options.enabled false Boolean Switch to enable table OPTION hints.

NOT NULL Enforcement Options

Flink supports the definition of NOT NULL constraints on table columns. When a query writes to a table, it might happen that a NOT NULL constraint of the sink table is violated at runtime.

By default, Flink reports an error and terminates the query. You can also configure Flink to silently drop all rows that would violate the NOT NULL constraint.

Option Default Type Description
table.exec.sink.not-null-enforcer “Error”
String
[“ERROR”, “DROP”]
Strategy to handle rows with NULL values would be written to columns with NOT NULL constraints.

Parallelism Options

You can configure the parallelism of all parallelizable query operators using this parameter. It overrides the general parallelism configuration option parallelism.default.

By default this option is disabled (set to -1).

Warning

Setting the parallelism of query operators with this configuration option will make VVP’s auto-pilot feature effectless. Auto-pilot configures the parallelism of Flink jobs via the general parallelism configuration, which is overridden by this configuration option.

Option Default Type Description
table.exec.resource.default-parallelism
-1
(disabled)
Integer The parallelism of all parallelizable operators (aggregation, join, filter, etc.).

Code Generation Options

Flink’s SQL optimizer generates Java code to evaluate expressions and call user-defined function. For complex queries, the generated code can exceed Java’s limit of 64 KB per method. Flink has a parameter to define an upper bound on the size of generated methods. Methods that would exceed the limit are split into multiple smaller methods.

Option Default Type Description
table.generated-code.max-length 64000 Integer Maximum size (in bytes) of code-generated function calls before they are split into sub-functions.

Time Zone Options

Processing values of the SQL data type TIMESTAMP_WITH_LOCAL_TIME_ZONE (see Data Types page) depends on the definition of a local time zone. By default, Flink uses the default timezone of the JVM but you can also set the local timezone explicitly via the Flink configuration.

Option Default Type Description
table.local-time-zone
The JVM
default time zone
String
(formatted as ZoneId)
The local time zone that is used when converting to or from values of TIMESTAMP_WITH_LOCAL_TIME_ZONE type.