SQL Scripts & Deployments
SQL Scripts
SQL Scripts are resources which are used to save and load the content of the SQL Editor. They can be edited, renamed, cloned, and deleted.
When a SQL Script is loaded into the UI editor, you can create a SQL Deployment from it. However, SQL Scripts are not linked to SQL Deployments. Changing a SQL Script after a Deployment was created from it, does not affect the Deployment.
SQL Deployments
SQL Deployments are a special type of Apache Flink® Deployments. Instead of a JAR file and a configuration, SQL Deployments consist of SQL statements and a Deployment configuration.
SQL Statements
A SQL Deployment executes a sequence of one or more SQL statements that must follow the pattern below:
# zero, one, or more CREATE TEMPORARY statements
{ CREATE TEMPORARY { TABLE | VIEW } ... ; }*
# followed by
{
# a single INSERT INTO statement
INSERT INTO <sinkTable> SELECT ... FROM <sourceTable>;
| # OR
# a single STATEMENT SET block with
BEGIN STATEMENT SET;
# one or more INSERT INTO statements
{ INSERT INTO <sinkTable> SELECT ... FROM <sourceTable>; }+
END;
}
A Deployment can define one or more temporary tables (CREATE TEMPORARY TABLE <temporary_table>
) or temporary views (CREATE TEMPORARY VIEW <temporary_view>
). These temporary objects only exist in the context of the deployment and must be defined before the actual query (or queries) to execute.
A Deployment executes either a single INSERT INTO query or a single STATEMENT SET block that encloses one or more INSERT INTO queries. All queries of a STATEMENT SET block are holistically optimized and executed as a single Flink job. Joint optimization and execution allows for reusing common intermediate results and can therefore significantly improve the efficiency of executing multiple queries.
The statements of a Deployment must be separated by a semicolon (;
).
Example
The following example shows a self-contained script with a data-generating source table metrics
and a two data-swallowing sink tables minuteAvg
and hourAvg
.
All tables are defined as TEMPORARY TABLE
. The TEMPORARY VIEW
filteredMetrics
filters rows from the source table metrics
.
The STATEMENT SET
consists of two INSERT INTO
queries.
The queries aggregate rows from the filteredMetrics
view by minute and hour and write their results into the respective sink table.
Both queries are optimized together and share the table scan and the filter operator.
After the shared scan and filter, each query computes its own aggregation and writes the result into its sink table.
CREATE TEMPORARY TABLE metrics (id INT, val INT, cat INT, procTime AS PROCTIME())
WITH (
'connector' = 'datagen', 'rows-per-second' = '10',
'fields.id.min' = '1', 'fields.id.max' = '100',
'fields.val.min' = '1', 'fields.val.max' = '1000',
'fields.cat.min' = '1', 'fields.cat.max' = '32'
);
CREATE TEMPORARY VIEW filteredMetrics AS
SELECT id, val, procTime
FROM metrics
WHERE cat IN (1, 2, 4, 7, 16);
CREATE TEMPORARY TABLE minuteAvg (id INT, valAvg INT, t TIMESTAMP(3))
WITH ('connector' = 'blackhole');
CREATE TEMPORARY TABLE hourAvg (id INT, valAvg INT, t TIMESTAMP(3))
WITH ('connector' = 'blackhole');
BEGIN STATEMENT SET;
INSERT INTO minuteAvg
SELECT id, AVG(val), TUMBLE_END(procTime, INTERVAL '1' MINUTE)
FROM filteredMetrics
GROUP BY id, TUMBLE(procTime, INTERVAL '1' MINUTE);
INSERT INTO hourAvg
SELECT id, AVG(val), TUMBLE_END(procTime, INTERVAL '1' HOUR)
FROM filteredMetrics
GROUP BY id, TUMBLE(procTime, INTERVAL '1' HOUR);
END;
Deployment Configuration
A SQL Deployment is configured just like any other Flink Deployment. In addition to the regular Flink options, there are several SQL-specific options which are discussed in detail on the Deployment Configuration page.
Creating SQL Deployments
You can create a SQL Deployment in two ways:
- Running an
INSERT INTO sinkTable SELECT * FROM sourceTable;
statement from the UI Editor. - Creating a Deployment resource via the REST API.
Below is an example of a SQL Deployment specification:
kind: Deployment
apiVersion: v1
metadata:
name: my-sql-query
labels: {}
spec:
state: running
deploymentTargetName: myDeploymentTarget
restoreStrategy:
kind: latest_state
upgradeStrategy:
kind: stateful
template:
spec:
artifact:
kind: sqlscript
flinkVersion: 1.13
sqlScript: INSERT INTO sinkTable SELECT * FROM sourceTable;
parallelism: 1
resources:
taskManager:
memory: 1.5g
flinkConfiguration:
taskmanager.numberOfTaskSlots: 1
state.savepoints.dir: s3://flink/savepoints
logging:
log4jLoggers:
org.apache.flink.streaming.examples: DEBUG
Managing SQL Deployments
SQL Deployments can be managed just like any other Apache Flink® Deployment Deployment (see Application Operations. Specifically, all the lifecycle management features work for SQL Deployments as they do for regular Deployments:
- Your SQL Deployments use the same highly-available, fault tolerant {flink} runtime as any other Deployment.
- Ververica Platform detects changes in your SQL Deployment's resource and automatically adjusts your running Deployment as configured in the Upgrade and Restore Strategy.
- You can
SUSPEND
a SQL Deployment and later resume from where it left off - If enabled, Autopilot automatically rescales your SQL Deployments based on the incoming load.
There are a few topics that require special attention.
Catalog & User-Defined Function Changes
Catalog changes and updated user-defined functions are neither automatically propagated to running SQL Deployments, nor are they incidentally picked up when the Deployment goes through a recovery loop after e.g. a Taskmanager failure. This was a deliberate design decision in order to protect deployed, running applications from ongoing catalog changes.
In order to "forward" such a change to a running Deployment you need to manually restart it. This will trigger a re-translation of the JobGraph and update all user-defined function and connector dependencies.
Note that this does not work for immutable SQL Deployments, i.e. it must be running on the current Apache Flink® SQL version of your release of Ververica Platform.
Apache Flink® Version
In contrast to regular Deployments, the Apache Flink® version and Apache Flink® image tag can not be configured by the user for SQL Deployments. Please see Ververica Platform Components for the Apache Flink® SQL version of your release of Ververica Platform.
When you upgrade Ververica Platform (e.g. 2.3 to 2.4) the supported Apache Flink® SQL version might also change. In that case all SQL Deployments still running on the old, previously supported version become immutable. Such Deployments can be suspended, canceled and resumed, but the SQL script cannot be changed until you update it to the current Apache Flink® SQL version of Ververica Platform. The same applies to the Apache Flink® configuration, with the exception of an allow-list of keys which do not affect the Job Graph.
As of Apache Flink® 1.13, these Deployments eventually need to be manually transferred to new Deployments running on the supported Apache Flink® version. We are working on making the upgrade of the Apache Flink® version of SQL Deployment much easier in the future. This implies that all application state needs to be rebuilt as part of the upgrade. Please see the next section for an example of how to perform such an upgrade without producing duplicates or reprocessing the full history.
SQL Script Changes
Generally, the SQL script (spec.template.spec.artifact[kind=sqlScript]
) of a Deployment is mutable and changing the field will trigger an upgrade of your Deployment for both STATEFUL
and STATELESS
upgrade strategy.
If you have configured a STATEFUL
upgrade strategy and a restore strategy of LATEST_STATE
or LATEST_SAVEPOINT
, Ververica Platform will take a snapshot of all application state during shutdown and try to restart your changed query from that state.
Whether this works, i.e. whether the state snapshot is compatible, depends on the nature of change.
In the following you will find a few examples of changes that tend to be compatible.
- Changes to the
WHERE
orHAVING
clause
--before
INSERT INTO large_orders SELECT order_id FROM orders WHERE amount > 10000;
--after
INSERT INTO large_orders SELECT order_id FROM orders WHERE amount > 20000 AND item_count > 10;
- Adding a field to a stateless query
--before
INSERT INTO large_orders SELECT order_id FROM orders WHERE amount > 10000;
--after
INSERT INTO large_orders SELECT order_id, order_time FROM orders WHERE amount > 10000;
We are working on features to verify whether a planned change is compatible with existing query state or not.
If you would like to evolve your query in an incompatible way, you generally need to SUSPEND your query (with or without Draining), change the SQL script and restart the Deployment from a clean state. There are few tricks and patterns that allow to go through such an upgrade without producing duplicates or reprocessing the full history.
Example
Let's say you have a query that counts orders with a tumbling event time window on a Kafka-backed orders
table and you need to change the duration of the window:
instead of counting orders per ten minutes you would like to count orders per minute in the future.
This is an incompatible upgrade.
The latest window that has been written by your existing query is 2020-09-21 12:00:00 - 12:09:59 and you would like the new query to only produce output starting from window 2020-09-21 12:10:00 - 12:10:59.
You can achieve this by suspending your old query (without draining to avoid results of an incomplete window), and then starting the Deployment with the new query.
When you start the new query, you can add a WHERE
clause to filter out events that have already been processed and emitted by the previous query.
Since the last emitted window is 2020-09-21 12:00:00 - 12:09:59, these are all events before 2020-09-21 12:10:00.
Moreover, you can add a table option to adjust the reading offset in your Kafka topic to some time before the end time of the last window instead of the latest committed group offset.
- Before
- After
INSERT INTO order_count SELECT TUMBLE_ROWTIME(time, 10 'INTERVAL' MINUTES) as window_time COUNT(*) FROM orders GROUP BY TUMBLE(time, 10 'INTERVAL' MINUTES)
INSERT INTO order_count SELECT TUMBLE_ROWTIME(time, 1 'INTERVAL' MINUTES) as window_time COUNT() FROM orders /+ OPTIONS('scan.startup.mode'='timestamp', scan.startup.timestamp-millis='1601164800000') */ WHERE order_time >= TO_TIMESTAMP('2020-09-21 12:10:00') GROUP BY TUMBLE(time, 1 'INTERVAL' MINUTES)
The same strategy can be applied to add an additional aggregate value to the query.
- Before
- After
INSERT INTO order_count SELECT TUMBLE_ROWTIME(time, 10 'INTERVAL' MINUTES) as window_time COUNT(*) FROM orders GROUP BY TUMBLE(time, 10 'INTERVAL' MINUTES)
INSERT INTO order_count SELECT TUMBLE_ROWTIME(time, 10 'INTERVAL' MINUTES) as window_time COUNT(), SUM(amount) FROM orders /+ OPTIONS('scan.startup.mode'='timestamp', scan.startup.timestamp-millis='1601164800000') */ WHERE order_time >= TO_TIMESTAMP('2020-09-21 12:10:00') GROUP BY TUMBLE(time, 10 'INTERVAL' MINUTES)