Queries

In SQL, the SELECT command is used to retrieve and process data from a table. The result of a query can be inserted into an output table using the INSERT INTO syntax.

The following statement reads the rows of the Orders table, counts the number of rows per order_id and hour, and writes the resulting rows into the OrderSummary table.

INSERT INTO OrderSummary
SELECT
    order_id,
    TUMBLE_START(order_time, INTERVAL '1' HOUR) AS order_time,
    COUNT(*) as number_sold
FROM Orders
GROUP BY
    TUMBLE(order_time, INTERVAL '1' HOUR),
    order_id

Basics

The general syntax of the SELECT command is:

[WITH <with_item_definition>][, ...n]
SELECT select_list FROM table_expression

<with_item_defintion>:
    with_item_name (column_name[, ...n]) AS <select_query>

The table_expression refers to any source of data. It could be an existing table, view, or VALUES clause, the joined results of multiple existing tables, or a subquery. Assuming that the table is available in the catalog, the following would read all rows from Orders.

SELECT * FROM Orders

The select_list specification * means the query will resolve all columns. However, usage of * is discouraged in production because it makes queries less robust to catalog changes. Instead, a select_list can specify a subset of available columns or make calculations using said columns. For example, if Orders has columns named order_id, price, and tax you could write the following query:

SELECT order_id, price + tax FROM Orders

Queries can also consume from inline data using the VALUES clause. Each tuple corresponds to one row and an alias may be provided to assign names to each column.

SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)

Rows can be filtered based on a WHERE clause.

SELECT price + tax FROM Orders WHERE id = 10

Additionally, built-in and user-defined scalar functions can be invoked on the columns of a single row. User-defined functions must be registered in a catalog before use.

SELECT PRETTY_PRINT(order_id) FROM Orders

The results from a SQL query can be written out to an external system using INSERT INTO syntax.

INSERT INTO OrderResults
SELECT order_id
FROM Orders

WITH

WITH provides a way to write auxiliary statements for use in a larger query. These statements, which are often referred to as Common Table Expressions or CTEs, can be thought of as defining temporary views that exist just for one query.

WITH orders_with_total (
    SELECT order_id, price + tax AS total
    FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id

SELECT DISTINCT

If SELECT DISTINCT is specified, all duplicate rows are removed from the result set (one row is kept from each group of duplicates).

SELECT DISTINCT id FROM Orders

For streaming queries, the required state for computing the query result might grow infinitely. State size depends on number of distinct rows. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details

Group Aggregations

Like most data systems, Apache Flink® supports aggregate functions; both built-in and user-defined. User-defined functions must be registered in a catalog before use.

An aggregate function computes a single result from multiple input rows. For example, there are aggregates to compute the COUNT, SUM, AVG (average), MAX (maximum) and MIN (minimum) over a set of rows.

SELECT COUNT(*) FROM Orders

It is important to understand that Flink runs continuous queries that never terminate. Instead they update their result table according to the updates on its input tables. So for the above query, Flink will output an updated count each time a new row is inserted into the Orders table.

GROUP BY

Apache Flink® supports the standard GROUP BY clause for aggregating data.

SELECT COUNT(*)
FROM Orders
GROUP BY order_id

For streaming queries, the required state for computing the query result might grow infinitely. State size depends on the number of groups and number and type of aggregation functions. For example MIN/MAX are heavy on state size while COUNT is cheap. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

GROUP BY Window Aggregation

Group windows are defined in the GROUP BY clause of a SQL query and provide a convenient way to group rows when the grouping clause contains a time attribute. Just like queries with regular GROUP BY clauses, queries with a group by window aggregation will compute a single result row per group.

Unlike other aggregations on continuous tables, group windows do not emit intermediate results but only a final result, the total aggregation at the end of the window. Moreover, group window aggregations purge all intermediate state when no longer needed.

Apache Flink® supports three types of group windows, which can be defined on either event or processing time attributes.

TUMBLE

Tumbling windows are of fixed length, and non-overlapping. The boundaries of tumbling windows are aligned to the Epoch. By aligned we mean that the window boundaries are the same for every key.

TUMBLE(time_attr, size_interval)
../_images/tumbling-windows.svg

For example, the number of orders per hour, for each active user.

SELECT user, TUMBLE_START(order_time, INTERVAL '1' HOUR), COUNT(*)
FROM Orders
GROUP BY TUMBLE(order_time, INTERVAL '1' HOUR), user

HOP

Hop windows are just like tumbling windows, except they overlap. The boundaries of hop windows are aligned to the Epoch. By aligned we mean that the window boundaries are the same for every key.

HOP(time_attr, size_interval, slide_interval)
../_images/sliding-windows.svg

The below query returns the number of orders per hour, reported every 20 minutes, for each active user.

SELECT user, HOP_START(order_time, INTERVAL '1' HOUR, INTERVAL '20' MINUTE), COUNT(*)
FROM Orders
GROUP BY HOP(order_time, INTERVAL '1' HOUR, INTERVAL '20' MINUTE), user

SESSION

Session windows do not have a fixed duration, but are characterized by a gap wherein events do not occur between sessions.

SESSION(time_attr, gap_interval)
../_images/session-windows.svg

The below query returns the number of orders, where each order occurred within 30 minutes of the last, for each user.

SELECT user, SESSION_START(order_time, INTERVAL '30' MINUTE), COUNT(*)
FROM Orders
GROUP BY SESSION(order_time, INTERVAL '30' MINUTE), user

Selecting Group Window Start and End Timestamps

The start and end timestamps of group windows can be selected with the following built-in functions. These functions must be called in the SELECT clause with exactly the same arguments as the group window function in the GROUP BY clause.

Start

Start functions return the timestamp of the inclusive lower bound of the corresponding tumbling, hopping, or session window.

TUMBLE_START(time_attr, size_interval)
HOP_START(time_attr, size_interval, slide_interval)
SESSION_START(time_attr, gap_interval)

End

End functions return the timestamp of the exclusive upper bound of the corresponding tumbling, hopping, or session window.

TUMBLE_END(time_attr, size_interval)
HOP_END(time_attr, size_interval, slide_interval)
SESSION_END(time_attr, gap_interval)

The exclusive upper bound timestamp cannot be used as a time attribute in subsequent time-based operations, such as interval joins and group window or over window aggregations.

Rowtime

Rowtime functions return the timestamp of the inclusive upper bound of the corresponding tumbling, hopping, or session window.

TUMBLE_ROWTIME(time_attr, size_interval)
HOP_ROWTIME(time_attr, size_interval, slide_interval)
SESSION_ROWTIME(time_attr, gap_interval)

The resulting timestamp is an event time attribute that can be used in subsequent time-based operations such as interval joins and group window or over window aggregations. If it is not extracted, then the row does not have an event time attribute.

Proctime

Proctime functions return a processing time attribute that can be used in subsequent time-based operations such as interval joins and group window or over window aggregations. If it is not extracted, then the row does not have a processing time attribute.

TUMBLE_PROCTIME(time_attr, size_interval)
HOP_PROCTIME(time_attr, size_interval, slide_interval)
SESSION_PROCTIME(time_attr, gap_interval)

DISTINCT Aggregation

Distinct aggregates remove duplicate values before applying an aggregation function. The following example counts the number of distinct order_ids instead of the total number of rows in the Orders table.

SELECT COUNT(DISTINCT order_id) FROM Orders

For streaming queries, the required state for computing the query result might grow infinitely. State size is mostly depends on the number of distinct rows and the time that a group is maintained, short lived group by windows are not a problem. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

GROUPING SETS

Grouping sets allow for more complex grouping operations than those describable by a standard GROUP BY. Rows are grouped separately by each specified grouping set and aggregates are computed for each group just as for simple GROUP BY clauses.

SELECT supplier_id, rating, COUNT(*) AS total
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ())
Results
supplier_id rating total
‘supplier1’ 3 1
‘supplier1’ 4 1
‘supplier1’ NULL 2
‘supplier2’ 3 1
‘supplier2’ 4 1
‘supplier1’ NULL 2
NULL NULL 4

Each sublist of GROUPING SETS may specify zero or more columns or expressions and is interpreted the same way as though it was used directly in the GROUP BY clause. An empty grouping set means that all rows are aggregated down to a single group, which is output even if no input rows were present.

References to the grouping columns or expressions are replaced by null values in result rows for grouping sets in which those columns do not appear.

For streaming queries, the required state for computing the query result might grow infinitely. State size depends on number of group sets and type of aggregation functions. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

ROLLUP

ROLLUP is a shorthand notation for specifying a common type of grouping set. It represents the given list of expressions and all prefixes of the list, including the empty list.

For example, the following query is equivalent to the one above.

SELECT supplier_id, rating, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY ROLLUP (supplier_id, rating)

CUBE

CUBE is a shorthand notation for specifying a common type of grouping set. It represents the given list and all of its possible subsets - the power set.

For example, the following two queries are equivalent.

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY CUBE (supplier_id, rating, product_id)

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
    ( supplier_id, product_id, rating ),
    ( supplier_id, product_id         ),
    ( supplier_id,             rating ),
    ( supplier_id                     ),
    (              product_id, rating ),
    (              product_id         ),
    (                          rating ),
    (                                 )
)

HAVING

HAVING eliminates group rows that do not satisfy the condition. HAVING is different from WHERE: WHERE filters individual rows before the GROUP BY while HAVING filters group rows created by GROUP BY. Each column referenced in condition must unambiguously reference a grouping column unless it appears within an aggregate function.

SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50

The presence of HAVING turns a query into a grouped query even if there is no GROUP BY clause. It is the same as what happens when the query contains aggregate functions but no GROUP BY clause. The query considers all selected rows to form a single group, and the SELECT list and HAVING clause can only reference table columns from within aggregate functions. Such a query will emit a single row if the HAVING condition is true, zero rows if it is not true.

OVER Window Aggregations

OVER window aggregates compute an aggregated value for every input row over a range of ordered rows. In contrast to GROUP BY aggregates, OVER aggregates do not reduce the number of result rows to a single row for every group. Instead OVER aggregates produce an aggregated value for every input row.

The following query computes for every order the sum of amounts of all orders for the same product that were received within one hour before the current order.

SELECT order_id, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM Orders

The syntax for an OVER window is summarized below.

SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...

You can define multiple OVER window aggregates in a SELECT clause. However, the OVER windows for all aggregates must be identical due to limiations in Apache Flink®.

Note

The syntax limitations that are explained in this section do not apply to the special cases of Top-N and Deduplication queries.

ORDER BY

OVER windows are defined on an ordered sequence of rows. Since tables do not have an inherent order, the ORDER BY clause is mandatory. Flink currently only supports OVER windows that are defined with an ascending time attributes order. Additional orderings are not supported.

PARTITION BY

OVER windows can be defined on a partitioned table. In presence of a PARTITION BY clause, the aggregate is computed for each input row only over the rows of its partition.

Range Definitions

The range definition specifies how many rows are included in the aggregate. The range is defined with a BETWEEN clause that defines a lower and an upper boundary. All rows between these boundaries are included in the aggregate. Flink only supports CURRENT ROW as the upper boundary.

There are two options to define the range, ROWS intervals and RANGE intervals.

RANGE intervals

A RANGE interval is defined on the values of the ORDER BY column, which is in case of Flink always a time attribute. The following RANGE interval defines that all rows with a time attribute of at most 30 minutes less than the current row are included in the aggregate.

RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW

ROW intervals

A ROWS interval is a count-based interval. It defines exactly how many rows are included in the aggregate. The following ROWS interval defines that the 10 rows preceding the current row and the current row (so 11 rows in total) are included in the aggregate.

ROWS BETWEEN 10 PRECEDING AND CURRENT ROW

WINDOW

The WINDOW clause can be used to define an OVER window outside of the SELECT clause. It can make queries more readable and also allows us to reuse the window definition for multiple aggregates.

SELECT order_id, order_time, amount,
  SUM(amount) OVER w AS sum_amount,
  AVG(amount) OVER w AS avg_amount
FROM Orders
WINDOW w AS (
  PARTITION BY product
  ORDER BY order_time
  RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)

Joins

Apache Flink® supports a large variety of joins to combine and enrich tables.

By default, the order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. You can tweak the performance of your join queries, by listing the tables with the lowest update frequency first and the tables with the highest update frequency last. Make sure to specify tables in an order that does not yield a cross join (Cartesian product), which are not supported and would cause a query to fail.

INNER Equi-JOIN

Returns a simple Cartesian product restricted by the join condition. Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.

SELECT *
FROM Orders
INNER JOIN Product
ON Orders.product_id = Product.id

For streaming queries, the required state for computing the query result might grow infinitely depending on the number of distinct input rows of all input tables and intermediate join results. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

OUTER Equi-JOIN

Returns all rows in the qualified Cartesian product (i.e., all combined rows that pass its join condition), plus one copy of each row in an outer table for which the join condition did not match with any row of the other table. Flink supports LEFT, RIGHT, and FULL outer joins. Currently, only equi-joins are supported, i.e., joins with at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.

SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id

For streaming queries, the required state for computing the query result might grow infinitely depending on the number of distinct input rows of all input tables and intermediate join results. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

Interval Join

Returns a simple Cartesian product restricted by the join condition and a time constraint. An interval join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Two appropriate range predicates can define such a condition (<, <=, >=, >), a BETWEEN predicate, or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.

For example, this query will join all orders with their corresponding shipments if the order was shipped four hours after the order was received.

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

The following predicates are examples of valid interval join conditions:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

Array Expansion

Returns a new row for each element in the given array. Unnesting WITH ORDINALITY is not yet supported.

SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Table Function

Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. User-defined table functions must be registered before use.

INNER JOIN

The row of the left (outer) table is dropped, if its table function call returns an empty result.

SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res)

LEFT OUTER JOIN

If a table function call returns an empty result, the corresponding outer row is preserved, and the result padded with null values. Currently, a left outer join against a lateral table requires a TRUE literal in the ON clause.

SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
  ON TRUE

Set Operations

UNION ALL

Returns the union of multiple tables without deduplication.

SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  UNION ALL
    (SELECT user FROM Orders WHERE b = 0)
)

IN

Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column, and the column must have the same data type as the expression.

SELECT user, amount
FROM Orders
WHERE product IN (
    SELECT product FROM NewProducts
)

The optimizer rewrites the IN condition into a join and group operation. The required state for computing the query result might grow infinitely depending on the number of distinct input rows. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

EXISTS

Returns true if the sub-query returns at least one row. This operation is only supported if the operation can be rewritten in a join and group operation.

SELECT order_id
FROM Orders o
WHERE EXISTS (
   SELECT * FROM NewProducts np WHERE np.product = o.product)

The optimizer rewrites the EXISTS operation into a join and group operation. The required state for computing the query result might grow infinitely depending on the number of distinct input rows. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

ORDER BY

The ORDER BY clause causes the result rows to be sorted according to the specified expression(s). If two rows are equal according to the leftmost expression, they are compared according to the next expression and so on. If they are equal according to all specified expressions, they are returned in an implementation-dependent order. The primary sort order of a table must be ascending on a time attribute. All subsequent orders can be freely chosen.

SELECT *
FROM Orders
ORDER BY order_time, order_id

Top-N

Top-N queries return the N smallest or largest values as determined by some set of columns. Top-N queries are useful in cases where the need is to find the N bottom-most or the N top-most records from a table based on some condition.

You can express a Top-N query using a combination of the ROW_NUMBER function defined on an OVER window and a WHERE condition. Group-wise Top-N queries can be defined by specifying an OVER window with a PARTITION BY clause.

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [ASC|DESC][, col2 [ASC|DESC]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

Parameter Specification

  • ROW_NUMBER(): Assigns a unique, sequential number to each row, starting from one, according to the ordering of rows within the partition. Currently, ROW_NUMBER is the only supported OVER window function.
  • PARTITION BY col1[, col2...]: Specifies the partition columns. Each partition will have a Top-N result.
  • ORDER BY col1 [ASC|DESC][, col2 [ASC|DESC]...]: Specifies the ordering columns. The ordering directions can be different on different columns and determines whether the query returns the N largest or smallest values.
  • WHERE rownum <= N: The rownum <= N condition is defines how many rows (per partition) are returned.
  • [AND conditions]: Arbitrary other conditions can only be combined with rownum <= N using AND conjunction.

For example, you may want to find the top five products per category with the maximum number of sales in real-time.

CREATE TABLE StoreSales (
    product_id BIGINT,
    category   STRING,
    sales      BIGINT
) WITH (...)

SELECT *
FROM (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
    FROM StoreSales
) WHERE row_num <= 5

Note

The above pattern must be followed exactly, otherwise the optimizer won’t be able to translate the query.

Deduplication

Deduplication removes rows that duplicate over a set of columns, keeping only the one with the earliest or latest timestamp. A scenario that often requires deduplication is an upstream ETL job, which does not feature end-to-end exactly-once guarantees; this may result in duplicate records being produced in the case of failure. Because duplicate records will affect the correctness of downstream analytical jobs, for example with aggregation functions such as SUM, COUNT, deduplication is needed before further analysis.

You can write a query to remove duplicates using the ROW_NUMBER() function. Conceptually, deduplication is a special case of Top-N, in which the N is one, and records are ordered by the processing time or event time attribute.

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr [ASC|DESC]) AS rownum
   FROM table_name)
WHERE rownum = 1

Parameter Specification

  • ROW_NUMBER(): Assigns an unique, sequential number to each row, starting with one.
  • PARTITION BY col1[, col2...]: Specifies the partition columns, i.e. the deduplicate key.
  • ORDER BY time_attr [ASC|DESC]: Specifies the ordering column, it must be a time attribute. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row.
  • WHERE rownum = 1: The rownum = 1 condition defines that only the earliest (or latest) row per set of partition values is kept.

Note

The above pattern must be followed exactly, otherwise the optimizer won’t be able to translate the query.

The following examples show how to deduplicate the rows of an orders table and only keep last received row per order_id.

SELECT order_id, user, product, number
FROM (
    SELECT *,
    ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num
    FROM Orders)
WHERE row_num = 1