Tables & Views

Tables define data sources and sinks in SQL where a different system and format may back each table. Once created, the table’s metadata is stored in a catalog and can be referenced in queries.

Create Table

CREATE TABLE creates a new table in the default database of the built-in catalog. If a table with the same name already exists in the catalog, the statement will fail. The statement contains both the logical schema and physical properties that define the underlying system and format.

CREATE TABLE [catalog_name.][db_name.]table_name
  (
    { <column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]

<column_definition>:
  column_name column_type [ <column_constraint> ]

<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<computed_column_definition>:
  column_name AS computed_column_expression

<watermark_definition>:
  WATERMARK FOR eventtime_column_name AS watermark_strategy_expression

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]

Schema Mapping

The body clause of a SQL CREATE TABLE statement defines the names and types of columns, constraints, and watermarks.

 CREATE TABLE Orders (
     order_id   BIGINT,
     order_time TIMESTAMP(3),
     price      DECIMAL(32, 2),
     quantity   INT
 ) WITH (
     'connector' = 'kafka',
     'topic' = 'orders',
     'properties.bootstrap.servers' = 'localhost:9092',
     'properties.group.id' = 'orderGroup',
     'format' = 'csv'
 )

Connector and Format Properties

The WITH clause configures the backing data storage - such as Apache Kafka® - and serialization format. See below for a full list of supported connectors and formats.

 CREATE TABLE Orders (
     order_id   BIGINT,
     order_time TIMESTAMP(3),
     price      DECIMAL(32, 2),
     quantity   INT
 ) WITH (
     'connector' = 'kafka',
     'topic' = 'orders',
     'properties.bootstrap.servers' = 'localhost:9092',
     'properties.group.id' = 'orderGroup',
     'format' = 'csv'
 )

Primary Key

Primary key constraints define that a column or a set of columns of a table are unique, and they do not contain nulls. The primary key columns uniquely identify a row in a table.

The primary key of a source table is used as metadata information for applying optimizations. For tables used as sinks, the underlying connector usually uses the primary key for upserting.

The SQL standard specifies that a primary key constraint can either be ENFORCED or NOT ENFORCED. This controls if the constraint checks are performed on incoming and outgoing data. Because Apache Flink® does not control the external systems that actually store the data, the only supported mode is NOT ENFORCED and it is up to the external system to ensure key integrity.

 CREATE TABLE Orders (
     order_id   BIGINT,
     order_time TIMESTAMP(3),
     price      DECIMAL(32, 2),
     quantity   INT,
     PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'kafka',
     'topic' = 'orders',
     'properties.bootstrap.servers' = 'localhost:9092',
     'properties.group.id' = 'orderGroup',
     'format' = 'csv'
 )

Computed Column

A computed column is a virtual column generated from a non-query expression and not physically stored within the table. For example, a computed column could be defined as cost AS price * quantity. The expression may contain any combination of physical columns, constants, function calls, or variables but cannot include a subquery.

 CREATE TABLE Orders (
     order_id   BIGINT,
     order_time TIMESTAMP(3),
     price      DECIMAL(32, 2),
     quantity   INT,
     cost       AS price * quantity,
     PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'kafka',
     'topic' = 'orders',
     'properties.bootstrap.servers' = 'localhost:9092',
     'properties.group.id' = 'orderGroup',
     'format' = 'csv'
 )

Computed columns are only relevant for tables used as sources; you cannot insert into a computed column. If a table with computed columns is used as a sink table, the computed columns are removed from its schema.

CREATE TABLE MyTable (
   a BIGINT,
   b AS a + 10
) WITH (...)

-- This statement will fail because you cannot
-- insert into a computed column
INSERT INTO MyTable SELECT * FROM MyTable

-- This statement will succeed because it is
-- only inserting into physical columns
INSERT INTO MyTable SELECT a FROM MyTable

Time Attributes

Time attributes are essential when working with unbounded streaming tables. Therefore both processing time and event time attributes can be defined as part of a schema.

Event Time

An event time column is a column of type TIMESTAMP(3) for which a WATERMARK is defined. The WATERMARK definition consists of the eventtime_column_name and a watermark_strategy_expression.

The eventtime_column_name refers to the column that will be the event time column of the table. The column must be of type TIMESTAMP(3) and be a top-level column in the schema. Check the data type mappings of the tables configured format to see what types can be directly interpreted as a timestamp.

Additionally, the eventtime_column_name can refer to a computed column. This allows timestamps to be extracted from arbitrary formats or types. For example, timestamps that are serialized in a custom string format can be parsed using the built-in TO_TIMESTAMP method.

-- This table will parse the serialized timestamp
-- based on the specified format

CREATE TABLE Orders (
   serialized_timestamp STRING,
   parsed_timestamp AS TO_TIMESTAMP(serialized_timestamp, 'EEE MMM dd HH:mm:ss zzz yyyy')
) WITH (...)

The watermark_strategy_expression defines the watermark generation strategy. The maximum observed value of the expression is forwarded periodically as the current watermark. It allows an arbitrary non-query expression, including computed columns, to calculate the watermark. The expression return type must be TIMESTAMP(3).

For example, if an orders table is known to have records appear with up to 5 seconds of out-of-orderness, that could be described using the below watermark.

 CREATE TABLE Orders (
     order_id   BIGINT,
     order_time TIMESTAMP(3),
     price      DECIMAL(32, 2),
     quantity   INT,
     cost       AS price * quantity,
     WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
     PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'kafka',
     'topic' = 'orders',
     'properties.bootstrap.servers' = 'localhost:9092',
     'properties.group.id' = 'orderGroup',
     'format' = 'csv'
 )

The most commonly used watermark strategies can be easily defined with SQL expressions:

Bounded Out-of-Orderness

Emits watermarks, which are the maximum observed timestamp minus the specified delay.

WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'time' time_unit

In most scenarios, streaming data does not arrive in perfect time order. However, an upper bound may usually be placed on how out of order records are expected to be. With this strategy, the table keeps track of the largest timestamp it has seen so far, and when asked for a watermark, substracts the delay from that maximum.

This means that once having seen an event for time t, all events for times earlier than t - delay are expected to have arrived. If the orders table is expected to be no more than 5 seconds out of order, the watermark would be specified:

WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
Strictly Ascending

Emits a watermark of the maximum observed timestamp so far.

WATERMARK FOR rowtime_column AS rowtime_column

This strategy is useful in scenarios where records are expected to arrive well ordered with respect to time.

Processing Time

A processing time attribute can be declared in the schema by adding a computed column generated from the PROCTIME() built-in function. The function PROCTIME() is a special intrinsic method that recalculates the current system time every time the column is read.

 CREATE TABLE Orders (
     order_id   BIGINT,
     order_time TIMESTAMP(3),
     price      DECIMAL(32, 2),
     quantity   INT,
     cost       AS price * quantity,
     proc_time  AS PROCTIME(),
     PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'kafka',
     'topic' = 'orders',
     'properties.bootstrap.servers' = 'localhost:9092',
     'properties.group.id' = 'orderGroup',
     'format' = 'csv'
 )

LIKE Clause

The LIKE clause allows creating a table based on a definition of an existing table. Users can include and exclude certain parts of the original table, such as the connector properties, computed columns, constraints, and watermarks.

The schema of the original table will always be preserved. You can add new columns to the schema, but existing physical columns cannot be dropped or altered. In contrast to the SQL standard, the clause is defined at the top-level of a CREATE statement.

The default behavior for properties in the WITH clause is to include all existing and overwrite those where new properties have been specified. You can, however, control the merging behavior of:

  • CONSTRAINTS - constraints such as primary and unique keys
  • GENERATED - computed columns
  • OPTIONS - connector options that describe connector and format properties
  • WATERMARKS - watermark declarations

with three different merging strategies:

  • INCLUDING - Includes the source table’s feature, fails on duplicate entries, e.g., if an option with the same key exists in both tables.
  • EXCLUDING - Does not include the given feature of the source table.
  • OVERWRITING - Includes the source table’s feature, overwrites duplicate entries of the source table with properties of the new table, e.g., if an option with the same key exists in both tables, the one from the current statement will be used.
CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'csv'
)

CREATE TABLE Orders_with_watermark (
    -- Add watermark definition
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    -- Overwrite the startup-mode
    'scan.startup.mode' = 'latest-offset'
) LIKE Orders

Additionally, the INCLUDING/EXCLUDING ALL option allows specifying what strategy should be used if there was no specific strategy defined.

Alter Table

ALTER TABLE changes the definition of an existing table. Only the connector and format properties of a table may be modified. Dropping properties and schema changes are only supported via the creation of a new table.

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

Note

The updated properties of a table - like any changes to the catalog - will only take effect on running SQL Deployments when they are re-translated. This happens every time the Deployment transitions to RUNNING. This means, in particular, that catalog changes take effect when Autopilot applies a change to the Deployment.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'csv'
)

ALTER TABLE Orders SET (
    -- Change the default startup-mode
    'scan.startup.mode' = 'latest-offset'
)

Drop Table

DROP TABLE removes a table from the catalog.

DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

Note

Dropping a table can break existing SQL deployments if they are re-translated which happens upon pause/resume or if the Autopilot changes the deployment configuration.

DROP TABLE Orders

Supported Connectors

Name Usage Supported Formats
Apache Kafka® Source / Sink CSV, Avro, JSON, Debezium-JSON, Canal-JSON
Print Sink Not Applicable
Blackhole Sink Not Applicable
Data Generator Source Not Applicable

Apache Kafka®

The Kafka connector allows for reading and writing data to and from Apache Kafka® topics.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

See the official Apache Flink® documentation for a full list of supported configurations.

BlackHole

The blackhole connector allows developers to create a table that swallows all input records. Consider it the /dev/null of Flink SQL.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2)
) WITH (
    'connector' = 'blackhole'
)

Often, blackhole tables are defined based on physical tables using a LIKE clause.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

CREATE TABLE BlackHoleOrders
WITH (
    'connector' = 'blackhole'
) LIKE Orders (EXCLUDING ALL)

Data Generator

The data generator connector allows developers to create a table backed by an in-memory data-generator. It allows for mocking out data sources during the early development of a query. When used in conjunction with computed columns, this source allows for a flexible generation of complex types

CREATE TABLE Orders (
    order_id BIGINT,
    order_time AS LOCALTIMESTAMP,
    quantity BIGINT
) WITH (
    'connector' = 'datagen'
)

Often, data generators are defined based on physical tables using a LIKE clause.

CREATE TABLE Orders (
    order_id   BIGINT,
    quantity   INT,
    cost       AS price * quantity,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

CREATE TABLE MockOrders WITH (
    'connector' = 'datagen'
) LIKE Orders (EXCLUDING ALL)

See the official Apache Flink® documentation for a full list of supported configurations.

Supported Formats

CSV

The CSV format allows reading and writing CSV data based on a CSV schema.

10,"2020-12-30 12:13:14.123",2.10,10

The schema is derived from the table schema, so the above example record would correspond to the below table definition.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

JSON

The JSON format allows reading and writing JSON data based on a JSON schema.

{
   "order_id": 10,
   "order_time": "2020-12-30 12:13:14.123",
   "price": 2.10,
   "buyer": {
      "first_name": "Elias",
      "last_name": "Williamson",
      "title": "Ms."
   }
}

The schema is derived from the table schema, so the above example record would correspond to the below table definition.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    buyer      ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'json'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

Apache Avro

The Avro format allows reading and writing Apache Avro data based on an Avro schema.

{
   "type": "record",
   "fields" : [
      {"name": "order_id", "type": "long"},
      {"name": "order_time", "type": "long", "logicalType": "timestamp-millis"},
      {"name": "price", "type": "bytes", "logicalType": "decimal", "precision": 32, "scale": 2},
      {"name": "buyer", "type": {
         "type": "record",
         "fields": [
            {"name": "first_name", "type": "string"},
            {"name": "last_name", "type": "string"},
            {"name": "title", "type": "string"}
         ]
      }}
  ]
}

The schema is derived from the table schema, so the above example schema would correspond to the below table definition.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    buyer      ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'avro'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

Debezium-JSON

Debezium is a Changelog Data Capture (CDC) tool that streams changes in real-time from MySQL, PostgreSQL, Oracle, Microsoft SQL Server, and many other databases into Apache Kafka®. It provides a unified format schema for changelog and supports to serialize messages using JSON.

Apache Flink® supports interpreting Debezium INSERT/UPDATE/DELETE messages. It is useful in many cases to leverage this feature, such as:

  • Synchronizing incremental data from databases to other systems
  • Auditing logs
  • Real-time materialized views on databases
CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'debezium-json'
)

See the official Apache Flink® documentation for a full list of properties The data type mappings are the same of those for JSON.

Canal-JSON

Canal is a Change Data Capture (CDC) tool that can stream changes from MySQL into other systems. It provides a unified format schema for changelog and supports serializing messages using JSON.

Apache Flink® supports interpreting Debezium INSERT/UPDATE/DELETE messages. It is useful in many cases to leverage this feature, such as:

  • Synchronizing incremental data from databases to other systems
  • Auditing logs
  • Real-time materialized views on databases
CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'canal-json'
)

See the official Apache Flink® documentation for a full list of properties . The data type mappings are the same of those for JSON.