Skip to main content
Version: 2.12

Tables & Views

Tables define data sources and sinks in SQL where a different system and format may back each table. Once created, the table’s metadata is stored in a catalog and can be referenced in queries.

Create Table

CREATE TABLE creates a new table in the default database of the built-in catalog. If a table with the same name already exists in the catalog, the statement will fail. The statement contains both the logical schema and physical properties that define the underlying system and format.

CREATE TABLE [catalog_name.][db_name.]table_name
(
{ <column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] ]

<column_definition>:
column_name column_type [ <column_constraint> ]

<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<computed_column_definition>:
column_name AS computed_column_expression

<watermark_definition>:
WATERMARK FOR eventtime_column_name AS watermark_strategy_expression

<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]

Schema Mapping

The body clause of a SQL CREATE TABLE statement defines the names and types of columns, constraints, and watermarks.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)

Connector and Format Properties

The WITH clause configures the backing data storage - such as Apache Kafka® - and serialization format. Please review the full list of supported connectors and formats.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)

Primary Key

Primary key constraints define that a column or a set of columns of a table are unique, and they do not contain nulls. The primary key columns uniquely identify a row in a table.

The primary key of a source table is used as metadata information for applying optimizations. For tables used as sinks, the underlying connector usually uses the primary key for upserting.

The SQL standard specifies that a primary key constraint can either be ENFORCED or NOT ENFORCED. This controls if the constraint checks are performed on incoming and outgoing data. Because {flink} does not control the external systems that actually store the data, the only supported mode is NOT ENFORCED and it is up to the external system to ensure key integrity.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)

Computed Column

A computed column is a virtual column generated from a non-query expression and not physically stored within the table. For example, a computed column could be defined as cost AS price * quantity. The expression may contain any combination of physical columns, constants, function calls, or variables but cannot include a subquery.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)

Computed columns are only relevant for tables used as sources; you cannot insert into a computed column. If a table with computed columns is used as a sink table, the computed columns are removed from its schema.

CREATE TABLE MyTable (
a BIGINT,
b AS a + 10
) WITH (...)

-- This statement will fail because you cannot
-- insert into a computed column
INSERT INTO MyTable SELECT * FROM MyTable

-- This statement will succeed because it is
-- only inserting into physical columns
INSERT INTO MyTable SELECT a FROM MyTable

Metadata Column

Some connectors and formats expose metadata of their external system or format as data that can be ingested or emitted by SQL queries. This metadata is not part of the regular payload data and can be, depending on the data's characteristics, read-only or writable. For example, the Kafka connector exposes the read-only message offset and the writable message timestamp.

In Flink SQL, such metadata can be accessed by declaring the column as a METADATA column, with the additional VIRTUAL keyword for read-only columns:

CREATE TEMPORARY TABLE Orders (
order_id BIGINT,
offset INT METADATA VIRTUAL,
headers MAP<STRING, BYTES> METADATA,

-- Or if you want to rename the column
record_offset INT METADATA FROM 'offset' VIRTUAL
) WITH (
'connector' = 'kafka',
/* … */
);
SELECT * FROM Orders

The full documentation can be found in the Apache Flink® documentation.

Please consult the Apache Flink® documentation of a connector or format to learn about the metadata that it provides.

Time Attributes

Time attributes are essential when working with unbounded streaming tables. Therefore both processing time and event time attributes can be defined as part of a schema.

Event Time

An event time column is a column of type TIMESTAMP(3) for which a WATERMARK is defined. The WATERMARK definition consists of the eventtime_column_name and a watermark_strategy_expression.

The eventtime_column_name refers to the column that will be the event time column of the table. The column must be of type TIMESTAMP(3) and be a top-level column in the schema. Check the data type mappings of the tables configured format to see what types can be directly interpreted as a timestamp.

Additionally, the eventtime_column_name can refer to a computed column. This allows timestamps to be extracted from arbitrary formats or types. For example, timestamps that are serialized in a custom string format can be parsed using the built-in TO_TIMESTAMP method.

-- This table will parse the serialized timestamp
-- based on the specified format
CREATE TABLE Orders (
serialized_timestamp STRING,
parsed_timestamp AS TO_TIMESTAMP(serialized_timestamp, 'EEE MMM dd HH:mm:ss zzz yyyy')
) WITH (...)

The watermark_strategy_expression defines the watermark generation strategy. The maximum observed value of the expression is forwarded periodically as the current watermark. It allows an arbitrary non-query expression, including computed columns, to calculate the watermark. The expression return type must be TIMESTAMP(3).

For example, if an orders table is known to have records appear with up to 5 seconds of out-of-orderness, that could be described using the below watermark.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)

The most commonly used watermark strategies can be easily defined with SQL expressions:

Bounded Out-of-Orderness

Emits watermarks, which are the maximum observed timestamp minus the specified delay.

WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'time' time_unit

In most scenarios, streaming data does not arrive in perfect time order. However, an upper bound may usually be placed on how out of order records are expected to be. With this strategy, the table keeps track of the largest timestamp it has seen so far, and when asked for a watermark, substracts the delay from that maximum.

This means that once having seen an event for time t, all events for times earlier than t - delay are expected to have arrived. If the orders table is expected to be no more than 5 seconds out of order, the watermark would be specified:

WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
Strictly Ascending

Emits a watermark of the maximum observed timestamp so far.

WATERMARK FOR rowtime_column AS rowtime_column

This strategy is useful in scenarios where records are expected to arrive well ordered with respect to time.

Processing Time

A processing time attribute can be declared in the schema by adding a computed column generated from the PROCTIME() built-in function. The function PROCTIME() is a special intrinsic method that recalculates the current system time every time the column is read.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
proc_time AS PROCTIME(),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)

LIKE Clause

The LIKE clause allows creating a table based on a definition of an existing table. Users can include and exclude certain parts of the original table, such as the connector properties, computed columns, constraints, and watermarks.

The schema of the original table will always be preserved. You can add new columns to the schema, but existing physical columns cannot be dropped or altered. In contrast to the SQL standard, the clause is defined at the top-level of a CREATE statement.

The default behavior for properties in the WITH clause is to include all existing and overwrite those where new properties have been specified. You can, however, control the merging behavior of:

  • CONSTRAINTS - constraints such as primary and unique keys
  • GENERATED - computed columns
  • OPTIONS - connector options that describe connector and format properties
  • WATERMARKS - watermark declarations

with three different merging strategies:

  • INCLUDING - Includes the source table's feature, fails on duplicate entries, e.g., if an option with the same key exists in both tables.
  • EXCLUDING - Does not include the given feature of the source table.
  • OVERWRITING - Includes the source table's feature, overwrites duplicate entries of the source table with properties of the new table, e.g., if an option with the same key exists in both tables, the one from the current statement will be used.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

CREATE TABLE Orders_with_watermark (
-- Add watermark definition
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- Overwrite the startup-mode
'scan.startup.mode' = 'latest-offset'
) LIKE Orders

Additionally, the INCLUDING/EXCLUDING ALL option allows specifying what strategy should be used if there was no specific strategy defined.

Alter Table

ALTER TABLE changes the definition of an existing table. Only the connector and format properties of a table may be modified. Dropping properties and schema changes are only supported via the creation of a new table.

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
note

The updated properties of a table - like any changes to the catalog - will only take effect on running SQL Deployments when they are re-translated. This happens every time the Deployment transitions to RUNNING. This means, in particular, that catalog changes take effect when Autopilot applies a change to the Deployment.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

ALTER TABLE Orders SET (
-- Change the default startup-mode
'scan.startup.mode' = 'latest-offset'
)

Drop Table

DROP TABLE removes a table from the catalog.

DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
note

Dropping a table can break existing SQL deployments if they are re-translated which happens upon pause/resume or if the Autopilot changes the deployment configuration.

DROP TABLE Orders

Create Temporary Table

CREATE TEMPORARY TABLE creates a table that only exists in the scope of a SQL script and that is not persisted in a catalog. A temporary table will shadow a catalog table with the same fully-qualified name (catName.dbName.tableName). The optimizer will use the temporary table instead of the catalog table within the scope of the script.

The syntax for CREATE TEMPORARY TABLE is the same as for CREATE TABLE, except for the additional TEMPORARY keyword.

CREATE TEMPORARY TABLE [catalog_name.][db_name.]table_name
(
{ <column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] ]

<column_definition>:
column_name column_type [ <column_constraint> ]

<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<computed_column_definition>:
column_name AS computed_column_expression

<watermark_definition>:
WATERMARK FOR eventtime_column_name AS watermark_strategy_expression

<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]

Temporary tables are useful when you would like to override a table in the catalog or to define a table that should not be published in the catalog, such as an output table that will not be read by any other SQL query.

Create Temporary View

CREATE TEMPORARY VIEW creates a view that only exists in the scope of a SQL script and that is not persisted in a catalog. A temporary view will shadow a catalog table with the same fully-qualified name (catName.dbName.tableName). The optimizer will use the temporary view instead of the catalog table within the scope of the script.

The syntax of CREATE TEMPORARY VIEW is as shown in the following:

CREATE TEMPORARY VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
[ ( columnName1, columnName2, ... ) ]
AS query_expression

The query_expression can be any valid SELECT query.

Temporary views are useful to split complex queries with many nested subqueries into separate parts that are easier to read and understand. Although this can also be achieved using the WITH clause, many users are more familiar with the concept of views. Moreover, temporary views can be used to override a catalog table in the scope of a script.

caution

Circular references are not supported. The query of a temporary view cannot reference a catalog table that has the same name as the temporary view.

Dynamic Table Options

Dynamic table options allow to specify or override table options inline in a statement. Dynamic table options are enabled by default in Ververica Platform.

Examples

Apache Kafka® Startup Mode

If you want to start from the latest available offset in Apache Kafka® instead of the earliest offset (as configured for Orders table in the catalog).

SELECT order_id
FROM Orders /*+ OPTIONS('scan.startup.mode'='latest-offset') */;

Streaming from & into Apache Hive®

Since the Hive Catalog is read-only, Flink-specific table options need to be passed via dynamic table options.

SELECT order_id
FROM `my-hive-catalog`.`default`.`Orders`
/*+ OPTIONS('streaming-source.enable'='true') */;
INSERT INTO `my-hive-catalog`.`default`.`Orders`
/*+ OPTIONS(
'sink.partition-commit.policy.kind' = 'metastore,success-file',
'sink.partition-commit.trigger' = 'partition-time',
'partition.time-extractor.timestamp-pattern' = '$dt $hr:00:00',
'sink.partition-commit.delay' = '5 min') */
SELECT ...
FROM ...