Skip to main content
Version: 2.13

Queries

In SQL, the SELECT command is used to retrieve and process data from a table. The result of a query can be inserted into an output table using the INSERT INTO syntax.

The following statement reads the rows of the Orders table, counts the number of rows per order_id and hour, and writes the resulting rows into the OrderSummary table.

INSERT INTO OrderSummary
SELECT
order_id,
TUMBLE_START(order_time, INTERVAL '1' HOUR) AS order_time,
COUNT(*) as number_sold
FROM Orders
GROUP BY
TUMBLE(order_time, INTERVAL '1' HOUR),
order_id

Basics

The general syntax of the SELECT command is:

[WITH <with_item_definition>][, ...n]
SELECT select_list FROM table_expression

<with_item_definition>:
with_item_name (column_name[, ...n]) AS <select_query>

The table_expression refers to any source of data. It could be an existing table, view, or VALUES clause, the joined results of multiple existing tables, or a subquery. Assuming that the table is available in the catalog, the following would read all rows from Orders.

SELECT * FROM Orders

The select_list specification * means the query will resolve all columns. However, usage of * is discouraged in production because it makes queries less robust to catalog changes. Instead, a select_list can specify a subset of available columns or make calculations using said columns. For example, if Orders has columns named order_id, price, and tax you could write the following query:

SELECT order_id, price + tax FROM Orders

Queries can also consume from inline data using the VALUES clause. Each tuple corresponds to one row and an alias may be provided to assign names to each column.

SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)

Rows can be filtered based on a WHERE clause.

SELECT price + tax FROM Orders WHERE id = 10

Additionally, built-in and user-defined scalar functions can be invoked on the columns of a single row. User-defined functions must be registered in a catalog before use.

SELECT PRETTY_PRINT(order_id) FROM Orders

The results from a SQL query can be written out to an external system using INSERT INTO syntax.

INSERT INTO OrderResults
SELECT order_id
FROM Orders

WITH

WITH provides a way to write auxiliary statements for use in a larger query. These statements, which are often referred to as Common Table Expressions or CTEs, can be thought of as defining temporary views that exist just for one query.

WITH orders_with_total AS (
SELECT order_id, price + tax AS total
FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id

SELECT DISTINCT

If SELECT DISTINCT is specified, all duplicate rows are removed from the result set (one row is kept from each group of duplicates).

SELECT DISTINCT id FROM Orders

For streaming queries, the required state for computing the query result might grow infinitely. State size depends on number of distinct rows. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details

Group Aggregations

Like most data systems, Apache Flink® supports aggregate functions; both built-in and user-defined. User-defined functions must be registered in a catalog before use.

An aggregate function computes a single result from multiple input rows. For example, there are aggregates to compute the COUNT, SUM, AVG (average), MAX (maximum) and MIN (minimum) over a set of rows.

SELECT COUNT(*) FROM Orders

It is important to understand that Flink runs continuous queries that never terminate. Instead they update their result table according to the updates on its input tables. So for the above query, Flink will output an updated count each time a new row is inserted into the Orders table.

GROUP BY

Apache Flink® supports the standard GROUP BY clause for aggregating data.

SELECT COUNT(*)
FROM Orders
GROUP BY order_id

For streaming queries, the required state for computing the query result might grow infinitely. State size depends on the number of groups and number and type of aggregation functions. For example MIN/MAX are heavy on state size while COUNT is cheap. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

GROUP BY Window Aggregation

Group windows are defined in the GROUP BY clause of a SQL query and provide a convenient way to group rows when the grouping clause contains a time attribute. Just like queries with regular GROUP BY clauses, queries with a group by window aggregation will compute a single result row per group.

Unlike other aggregations on continuous tables, group windows do not emit intermediate results but only a final result, the total aggregation at the end of the window. Moreover, group window aggregations purge all intermediate state when no longer needed.

Apache Flink® supports three types of group windows, which can be defined on either event or processing time attributes.

TUMBLE

Tumbling windows are of fixed length, and non-overlapping. The boundaries of tumbling windows are aligned to the Epoch. By aligned we mean that the window boundaries are the same for every key.

TUMBLE(time_attr, size_interval)

Image

For example, the number of orders per hour, for each active user.

SELECT user, TUMBLE_START(order_time, INTERVAL '1' HOUR), COUNT(*)
FROM Orders
GROUP BY TUMBLE(order_time, INTERVAL '1' HOUR), user

HOP

Hop windows are just like tumbling windows, except they overlap. The boundaries of hop windows are aligned to the Epoch. By aligned we mean that the window boundaries are the same for every key.

HOP(time_attr, slide_interval, size_interval)

Image

The below query returns the number of orders per hour, reported every 20 minutes, for each active user.

SELECT user, HOP_START(order_time, INTERVAL '20' MINUTE, INTERVAL '1' HOUR), COUNT(*)
FROM Orders
GROUP BY HOP(order_time, INTERVAL '20' MINUTE, INTERVAL '1' HOUR), user
note

It is also valid to have a size interval that is smaller than the slide interval.

The following query will, for each active user, report every hour the number or orders during the last 20 minutes of that hour:

SELECT user, HOP_START(order_time, INTERVAL '1' HOUR, INTERVAL '20' MINUTE), COUNT(*)
FROM Orders
GROUP BY HOP(order_time, INTERVAL '1' HOUR, INTERVAL '20' MINUTE), user

SESSION

Session windows do not have a fixed duration, but are characterized by a gap wherein events do not occur between sessions.

SESSION(time_attr, gap_interval)

Image

The below query returns the number of orders, where each order occurred within 30 minutes of the last, for each user.

SELECT user, SESSION_START(order_time, INTERVAL '30' MINUTE), COUNT(*)
FROM Orders
GROUP BY SESSION(order_time, INTERVAL '30' MINUTE), user

Selecting Group Window Start and End Timestamps

The start and end timestamps of group windows can be selected with the following built-in functions. These functions must be called in the SELECT clause with exactly the same arguments as the group window function in the GROUP BY clause.

Start

Start functions return the timestamp of the inclusive lower bound of the corresponding tumbling, hopping, or session window.

TUMBLE_START(time_attr, size_interval)
HOP_START(time_attr, size_interval, slide_interval)
SESSION_START(time_attr, gap_interval)

End

End functions return the timestamp of the exclusive upper bound of the corresponding tumbling, hopping, or session window.

TUMBLE_END(time_attr, size_interval)
HOP_END(time_attr, size_interval, slide_interval)
SESSION_END(time_attr, gap_interval)

The exclusive upper bound timestamp cannot be used as a time attribute in subsequent time-based operations, such as interval joins and group window or over window aggregations.

Rowtime

Rowtime functions return the timestamp of the inclusive upper bound of the corresponding tumbling, hopping, or session window.

TUMBLE_ROWTIME(time_attr, size_interval)
HOP_ROWTIME(time_attr, size_interval, slide_interval)
SESSION_ROWTIME(time_attr, gap_interval)

The resulting timestamp is an event time attribute that can be used in subsequent time-based operations such as interval joins and group window or over window aggregations. If it is not extracted, then the row does not have an event time attribute.

Proctime

Proctime functions return a processing time attribute that can be used in subsequent time-based operations such as interval joins and group window or over window aggregations. If it is not extracted, then the row does not have a processing time attribute.

TUMBLE_PROCTIME(time_attr, size_interval)
HOP_PROCTIME(time_attr, size_interval, slide_interval)
SESSION_PROCTIME(time_attr, gap_interval)

DISTINCT Aggregation

Distinct aggregates remove duplicate values before applying an aggregation function. The following example counts the number of distinct order_ids instead of the total number of rows in the Orders table.

SELECT COUNT(DISTINCT order_id) FROM Orders

For streaming queries, the required state for computing the query result might grow infinitely. State size is mostly depends on the number of distinct rows and the time that a group is maintained, short lived group by windows are not a problem. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

GROUPING SETS

Grouping sets allow for more complex grouping operations than those describable by a standard GROUP BY. Rows are grouped separately by each specified grouping set and aggregates are computed for each group just as for simple GROUP BY clauses.

SELECT supplier_id, rating, COUNT(*) AS total
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ())

Results

supplier_idratingtotal
'supplier1'31
'supplier1'41
'supplier1'NULL2
'supplier2'31
'supplier2'41
'supplier1'NULL2
NULLNULL4

Each sublist of GROUPING SETS may specify zero or more columns or expressions and is interpreted the same way as though it was used directly in the GROUP BY clause. An empty grouping set means that all rows are aggregated down to a single group, which is output even if no input rows were present.

References to the grouping columns or expressions are replaced by null values in result rows for grouping sets in which those columns do not appear.

For streaming queries, the required state for computing the query result might grow infinitely. State size depends on number of group sets and type of aggregation functions. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

ROLLUP

ROLLUP is a shorthand notation for specifying a common type of grouping set. It represents the given list of expressions and all prefixes of the list, including the empty list.

For example, the following query is equivalent to the one above.

SELECT supplier_id, rating, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY ROLLUP (supplier_id, rating)

CUBE

CUBE is a shorthand notation for specifying a common type of grouping set. It represents the given list and all of its possible subsets - the power set.

For example, the following two queries are equivalent.

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY CUBE (supplier_id, rating, product_id)

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)

HAVING

HAVING eliminates group rows that do not satisfy the condition. HAVING is different from WHERE: WHERE filters individual rows before the GROUP BY while HAVING filters group rows created by GROUP BY. Each column referenced in condition must unambiguously reference a grouping column unless it appears within an aggregate function.

SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50

The presence of HAVING turns a query into a grouped query even if there is no GROUP BY clause. It is the same as what happens when the query contains aggregate functions but no GROUP BY clause. The query considers all selected rows to form a single group, and the SELECT list and HAVING clause can only reference table columns from within aggregate functions. Such a query will emit a single row if the HAVING condition is true, zero rows if it is not true.

OVER Window Aggregations

OVER window aggregates compute an aggregated value for every input row over a range of ordered rows. In contrast to GROUP BY aggregates, OVER aggregates do not reduce the number of result rows to a single row for every group. Instead OVER aggregates produce an aggregated value for every input row.

The following query computes for every order the sum of amounts of all orders for the same product that were received within one hour before the current order.

SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM Orders

The syntax for an OVER window is summarized below.

SELECT 
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...

You can define multiple OVER window aggregates in a SELECT clause. However, the OVER windows for all aggregates must be identical due to limitations in Apache Flink®.

note

The syntax limitations that are explained in this section do not apply to the special cases of Top-N and Deduplication queries.

ORDER BY

OVER windows are defined on an ordered sequence of rows. Since tables do not have an inherent order, the ORDER BY clause is mandatory. Flink currently only supports OVER windows that are defined with an ascending time attributes order. Additional orderings are not supported.

PARTITION BY

OVER windows can be defined on a partitioned table. In presence of a PARTITION BY clause, the aggregate is computed for each input row only over the rows of its partition.

Range Definitions

The range definition specifies how many rows are included in the aggregate. The range is defined with a BETWEEN clause that defines a lower and an upper boundary. All rows between these boundaries are included in the aggregate. Flink only supports CURRENT ROW as the upper boundary.

There are two options to define the range, ROWS intervals and RANGE intervals.

RANGE intervals

A RANGE interval is defined on the values of the ORDER BY column, which is in case of Flink always a time attribute. The following RANGE interval defines that all rows with a time attribute of at most 30 minutes less than the current row are included in the aggregate.

RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW

ROW intervals

A ROWS interval is a count-based interval. It defines exactly how many rows are included in the aggregate. The following ROWS interval defines that the 10 rows preceding the current row and the current row (so 11 rows in total) are included in the aggregate.

ROWS BETWEEN 10 PRECEDING AND CURRENT ROW

WINDOW

The WINDOW clause can be used to define an OVER window outside of the SELECT clause. It can make queries more readable and also allows us to reuse the window definition for multiple aggregates.

SELECT order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM Orders
WINDOW w AS (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)

Joins

Apache Flink® supports a large variety of joins to combine and enrich tables.

By default, the order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. You can tweak the performance of your join queries, by listing the tables with the lowest update frequency first and the tables with the highest update frequency last. Make sure to specify tables in an order that does not yield a cross join (Cartesian product), which are not supported and would cause a query to fail.

INNER Equi-JOIN

Returns a simple Cartesian product restricted by the join condition. Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.

SELECT *
FROM Orders
INNER JOIN Product
ON Orders.product_id = Product.id

For streaming queries, the required state for computing the query result might grow infinitely depending on the number of distinct input rows of all input tables and intermediate join results. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

OUTER Equi-JOIN

Returns all rows in the qualified Cartesian product (i.e., all combined rows that pass its join condition), plus one copy of each row in an outer table for which the join condition did not match with any row of the other table. Flink supports LEFT, RIGHT, and FULL outer joins. Currently, only equi-joins are supported, i.e., joins with at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.

SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id

For streaming queries, the required state for computing the query result might grow infinitely depending on the number of distinct input rows of all input tables and intermediate join results. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

Interval Join

Returns a simple Cartesian product restricted by the join condition and a time constraint. An interval join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Two appropriate range predicates can define such a condition (<, <=, >=, >), a BETWEEN predicate, or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.

For example, this query will join all orders with their corresponding shipments if the order was shipped four hours after the order was received.

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

The following predicates are examples of valid interval join conditions:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

Lookup Join

A lookup join is typically used to enrich a table with data that is queried from an external system. The join requires one table to have a processing time attribute and the other table to be backed by a lookup source connector.

The following example shows the syntax to specify a lookup join.

-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
'table-name' = 'customers'
);

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

In the example above, the Orders table is enriched with data from the Customers table which resides in a MySQL database. The FOR SYSTEM_TIME AS OF clause with the subsequent processing time attribute ensures that each row of the Orders table is joined with those Customers rows that match the join predicate at the point in time when the Orders row is processed by the join operator. It also prevents that the join result is updated when a joined Customer row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above o.customer_id = c.id.

Temporal Table Join

A temporal table join enriches a table with data from a versioned table at a specific point in time, either based on processing or event time semantics. Versioned tables are implicitly created for connectors or formats that ingest changelogs. They can also be explicitly created from a table with changelog semantics using a deduplication query.

The following example demonstrates how a versioned table can be used in a temporal table join. You can also find examples using deduplication queries in the Apache Flink® documentation.

CREATE TEMPORARY TABLE Orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
) WITH (/* … */);

CREATE TEMPORARY TABLE Rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
/* … */
);

SELECT
order.order_id,
order.order_time,
order.amount * rate.rate AS amount,
rate.currency
FROM Orders AS order, Rates FOR SYSTEM_TIME AS OF order.order_time rate
on order.currency = rate.currency;

Array Expansion

Returns a new row for each element in the given array. Unnesting WITH ORDINALITY is not yet supported.

SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Table Function

Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. User-defined table functions must be registered before use.

INNER JOIN

The row of the left (outer) table is dropped, if its table function call returns an empty result.

SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res)

LEFT OUTER JOIN

If a table function call returns an empty result, the corresponding outer row is preserved, and the result padded with null values. Currently, a left outer join against a lateral table requires a TRUE literal in the ON clause.

SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
ON TRUE

Set Operations

UNION ALL

Returns the union of multiple tables without deduplication.

SELECT *
FROM (
(SELECT user FROM Orders WHERE a % 2 = 0)
UNION ALL
(SELECT user FROM Orders WHERE b = 0)
)

IN

Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column, and the column must have the same data type as the expression.

SELECT user, amount
FROM Orders
WHERE product IN (
SELECT product FROM NewProducts
)

The optimizer rewrites the IN condition into a join and group operation. The required state for computing the query result might grow infinitely depending on the number of distinct input rows. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

EXISTS

Returns true if the sub-query returns at least one row. This operation is only supported if the operation can be rewritten in a join and group operation.

SELECT order_id
FROM Orders o
WHERE EXISTS (
SELECT * FROM NewProducts np WHERE np.product = o.product)

The optimizer rewrites the EXISTS operation into a join and group operation. The required state for computing the query result might grow infinitely depending on the number of distinct input rows. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.

ORDER BY

The ORDER BY clause causes the result rows to be sorted according to the specified expression(s). If two rows are equal according to the leftmost expression, they are compared according to the next expression and so on. If they are equal according to all specified expressions, they are returned in an implementation-dependent order. The primary sort order of a table must be ascending on a time attribute. All subsequent orders can be freely chosen.

SELECT *
FROM Orders
ORDER BY order_time, order_id

Top-N

Top-N queries return the N smallest or largest values as determined by some set of columns. Top-N queries are useful in cases where the need is to find the N bottom-most or the N top-most records from a table based on some condition.

You can express a Top-N query using a combination of the ROW_NUMBER function defined on an OVER window and a WHERE condition. Group-wise Top-N queries can be defined by specifying an OVER window with a PARTITION BY clause.

SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [ASC|DESC][, col2 [ASC|DESC]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]

Parameter Specification

  • ROW_NUMBER(): Assigns a unique, sequential number to each row, starting from one, according to the ordering of rows within the partition. Currently, ROW_NUMBER is the only supported OVER window function.
  • PARTITION BY col1[, col2...]: Specifies the partition columns. Each partition will have a Top-N result.
  • ORDER BY col1 [ASC|DESC][, col2 [ASC|DESC]...]: Specifies the ordering columns. The ordering directions can be different on different columns and determines whether the query returns the N largest or smallest values.
  • WHERE rownum <= N: The rownum <= N condition defines how many rows (per partition) are returned.
  • [AND conditions]: Arbitrary other conditions can only be combined with rownum <= N using AND conjunction.

For example, you may want to find the top five products per category with the maximum number of sales in real-time.

CREATE TABLE StoreSales (
product_id BIGINT,
category STRING,
sales BIGINT
) WITH (...)

SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
FROM StoreSales
) WHERE row_num <= 5
note

The above pattern must be followed exactly, otherwise the optimizer won't be able to translate the query.

Deduplication

Deduplication removes rows that duplicate over a set of columns, keeping only the one with the earliest or latest timestamp. A scenario that often requires deduplication is an upstream ETL job, which does not feature end-to-end exactly-once guarantees; this may result in duplicate records being produced in the case of failure. Because duplicate records will affect the correctness of downstream analytical jobs, for example with aggregation functions such as SUM, COUNT, deduplication is needed before further analysis.

You can write a query to remove duplicates using the ROW_NUMBER() function. Conceptually, deduplication is a special case of Top-N, in which the N is one, and records are ordered by the processing time or event time attribute.

SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [ASC|DESC]) AS rownum
FROM table_name)
WHERE rownum = 1

Parameter Specification

  • ROW_NUMBER(): Assigns an unique, sequential number to each row, starting with one.
  • PARTITION BY col1[, col2...]: Specifies the partition columns, i.e. the deduplicate key.
  • ORDER BY time_attr [ASC|DESC]: Specifies the ordering column, it must be a time attribute. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row.
  • WHERE rownum = 1: The rownum = 1 condition defines that only the earliest (or latest) row per set of partition values is kept.
note

The above pattern must be followed exactly, otherwise the optimizer won't be able to translate the query.

The following examples show how to deduplicate the rows of an orders table and only keep last received row per order_id.

SELECT order_id, user, product, number
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num
FROM Orders)
WHERE row_num = 1

Pattern Detection

The MATCH_RECOGNIZE clause is used to search for a pattern of events over a sorted table. This clause enables you to define event patterns using regular expressions and aggregate methods to verify and extract values from the match.

The following example finds the highest price of a stock before it starts to go down. It shows the types of complex patterns that can be described decoratively using the MATCH_RECOGNIZE clause.

CREATE TABLE Ticker (
ticker_time TIMESTAMP(3),
stock BIGINT,
price DECIMAL,
WATERMARK FOR ticker_time AS ticker_time
) WITH (...)

SELECT
s,
p
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY stock
ORDER BY ticker_time
MEASURES
A.stock AS s,
A.price AS p
ONE ROW PER MATCH
PATTERN(A B* C)
DEFINE
B AS (price < PREV(price)),
C AS (price >= PREV(price))
)

MATCH_RECOGNIZE has a matching output of ONE ROW PER MATCH as default, which is the only matching available. This means the match produces a single row result per match and does not return the rows that are matched.

PARTITION BY

The PARTITION BY clause allows the match to be keyed and partitioned over a column name. A match will happen over every unique key specified by the partition statement. This enables a single query to be matched over all the keys and generate separate matches, one to every key.

ORDER BY

The ORDER BY clause defines the order of rows on which the patterns are matched against. The first column of the clause must be an ascending time attribute.

PATTERN

The PATTERN clause defines the regular expression of events to be searched over the sorted table. Every pattern is constructed from basic building blocks, called pattern variables, to which operators (quantifiers and other modifiers) can be applied. The whole pattern must be enclosed in brackets. Modifiers like + and * can be used to modify the frequency of a variable when matching events.

PATTERN (A B+ C* D)

The pattern on this example defines four pattern variables A, B, C, and D. Variable A should appear once, followed by B one or more times, C zero or more times, and finally D exactly once.

Pattern Quantifiers

Pattern quantifiers are used to specify how a pattern is mapped in the sorted table, defining how many times a pattern variable needs to match to be valid. The following quantifiers are available:

QuantifierDescription
*Zero or more times
+One or more times
?Zero or one time
{ n }Exactly n times (n > 0)
{ n, }n or more times (n >= 0)
{ n, m }Between n and m times inclusive (0 <= n <= m, 0 < m)
{ , m }Between 0 and m times inclusive (m > 0)
caution

Patterns that can produce an empty match are not supported. Examples of such patterns are PATTERN (A*), PATTERN (A? B*), PATTERN (A{0,} B{0,} C*), etc.

Greedy & Reluctant Quantifiers

Each quantifier can be either greedy (default behavior) or reluctant. Greedy quantifiers try to match as many rows as possible while reluctant quantifiers try to match as few as possible. A pattern is made reluctant by appending a ? question mark to it.

Since the * qualifier tells a pattern to match zero or more rows, A*? B defines a pattern that matches zero or more A's followed by B, but as few as needed. It might be none at all if a rows could match both A and B.

MEASURES and DEFINE

The MEASURES clause is similar to the SELECT clause in a simple SQL query. It is used to define the projected values from the match using aggregate methods. For example, MAX(A.id) AS max_id will output the max id value that was found over all events of a matched pattern.

The DEFINE clause is similar to the WHERE clause in a simple SQL query. It specifies conditions that rows have to fulfill to be classified as a corresponding pattern variable. If there is no condition defined for a pattern variable, it will match any row.

Pattern Navigation

The MEASURES and DEFINE clauses allow for navigating within the list of rows that (potentially) match a pattern.

A pattern variable reference allows a set of rows mapped to a particular pattern variable in the DEFINE or MEASURES clauses to be referenced. For example, the expression A.price describes a set of rows mapped so far to A plus the current row if we try to match the current row to A. If an expression in the clause requires a single row (e.g., A.price or A.price > 10), it selects the last value belonging to the corresponding set.

If no pattern variable is specified (e.g., SUM(price)), an expression references the default pattern variable * which references all variables in the pattern. It creates a list of all the rows mapped so far to any variable plus the current row.

Logical Offsets

Logical offsets enable navigation within the events that were mapped to a particular pattern variable. This can be expressed with two corresponding functions:

Offset functionsDescription
LAST(variable.field, n)Returns the value of the field from the event that was mapped to the n-th last element of the variable. The counting starts at the last element mapped.
FIRST(variable.field, n)Returns the value of the field from the event that was mapped to the n-th element of the variable. The counting starts at the first element mapped.

Aggregations

Both build-in and user defined aggregation functions can be used in the MEASURES and DEFINE clauses. Aggregate functions are applied to each subset of rows mapped to a match.

Aggregations can be applied to expressions, but only if they reference a single pattern variable. Thus, SUM(A.price * A.tax) is a valid one, but AVG(A.price * B.tax) is not.

Time Constraint

The WITHIN clause enforces that a pattern matches within a given time interval. The clause is optionally defined after the PATTERN clause and takes an INTERVAL DAY TO MONTH value. It is generally encouraged to use the WITHIN clause as it helps Apache Flink® with efficient memory management. The state of the underlying pattern matching state machine is pruned once the threshold is reached.

PATTERN (A B) WITHIN INTERVAL '1' HOUR

After Match Strategy

The AFTER MATCH SKIP clause specifies where to start a new matching procedure after a complete match was found. There are four different strategies:

  • SKIP PAST LAST ROW - resumes the pattern matching at the next row after the last row of the current match.
  • SKIP TO NEXT ROW - continues searching for a new match starting at the next row after the starting row of the match.
  • SKIP TO LAST variable - resumes the pattern matching at the last row that is mapped to the specified pattern variable.
  • SKIP TO FIRST variable - resumes the pattern matching at the first row that is mapped to the specified pattern variable.

This is also a way to specify how many matches a single event can belong to. For example, with the SKIP PAST LAST ROW strategy, every row can belong to at most one match.

Time Attributes

In order to apply some subsequent operations on top of the MATCH_RECOGNIZE results it might be required to use time attributes. To select those there are available two functions:

MATCH_ROWTIME()

MATCH_ROWTIME() returns the event time attribute the last row that was mapped to the given pattern.

MATCH_PROCTIME()

MATCH_PROCTIME() returns a processing time attribute that can be used in subsequent operations.

Example

Lets consider the example shown at the top of this section. The query is reading from a table called Ticker that contains prices of stocks at a particular point in time and has the following schema:

CREATE TABLE Ticker (
symbol STRING,
price DECIMAL(32, 2),
tax BIGINT,
ticker_time TIMESTAMP(3),
WATERMARK FOR ticker_time AS ticker_time - INTERVAL '5' SECOND
) WITH (...)

The task is now to find periods of a constantly decreasing price of a single ticker. For this, one could write a query like:

SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY ticker_time
MEASURES
START_ROW.ticker_time AS start_tstamp,
LAST(PRICE_DOWN.ticker_time) AS bottom_tstamp,
LAST(PRICE_UP.ticker_time) AS end_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO LAST PRICE_UP
PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)
DEFINE
PRICE_DOWN AS
(LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
PRICE_UP AS
PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
) MR;

The query partitions the Ticker table by the symbol column and orders it by the ticker_time time attribute.

The PATTERN clause specifies that we are interested in a pattern with a starting event START_ROW that is followed by one or more PRICE_DOWN events and concluded with a PRICE_UP event. If such a pattern can be found, the next pattern match will be sought at the last PRICE_UP event as indicated by the AFTER MATCH SKIP TO LAST clause.

The DEFINE clause specifies the conditions that need to be met for a PRICE_DOWN and PRICE_UP event. Although the START_ROW pattern variable is not present it has an implicit condition that is evaluated always as TRUE.

A pattern variable PRICE_DOWN is defined as a row with a price that is smaller than the price of the last row that met the PRICE_DOWN condition. For the initial case or when there is no last row that met the PRICE_DOWN condition, the price of the row should be smaller than the price of the preceding row in the pattern (referenced by START_ROW).

A pattern variable PRICE_UP is defined as a row with a price that is larger than the price of the last row that met the PRICE_DOWN condition.

This query produces a summary row for each period in which the price of a stock was continuously decreasing. The exact representation of the output rows is defined in the MEASURES part of the query. It will return three columns, the time prices started going down, the time of the lowest price, and the time prices started going back up again.