Queries
In SQL, the SELECT
command is used to retrieve and process data from a table.
The result of a query can be inserted into an output table using the INSERT INTO
syntax.
The following statement reads the rows of the Orders
table, counts the number of rows per order_id
and hour, and writes the resulting rows into the OrderSummary
table.
INSERT INTO OrderSummary
SELECT
order_id,
TUMBLE_START(order_time, INTERVAL '1' HOUR) AS order_time,
COUNT(*) as number_sold
FROM Orders
GROUP BY
TUMBLE(order_time, INTERVAL '1' HOUR),
order_id
Basics
The general syntax of the SELECT
command is:
[WITH <with_item_definition>][, ...n]
SELECT select_list FROM table_expression
<with_item_definition>:
with_item_name (column_name[, ...n]) AS <select_query>
The table_expression
refers to any source of data.
It could be an existing table, view, or VALUES
clause, the joined results of multiple existing tables, or a subquery.
Assuming that the table is available in the catalog, the following would read all rows from Orders
.
SELECT * FROM Orders
The select_list
specification *
means the query will resolve all columns.
However, usage of *
is discouraged in production because it makes queries less robust to catalog changes.
Instead, a select_list
can specify a subset of available columns or make calculations using said columns.
For example, if Orders
has columns named order_id
, price
, and tax
you could write the following query:
SELECT order_id, price + tax FROM Orders
Queries can also consume from inline data using the VALUES
clause.
Each tuple corresponds to one row and an alias may be provided to assign names to each column.
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)
Rows can be filtered based on a WHERE
clause.
SELECT price + tax FROM Orders WHERE id = 10
Additionally, built-in and user-defined scalar functions can be invoked on the columns of a single row. User-defined functions must be registered in a catalog before use.
SELECT PRETTY_PRINT(order_id) FROM Orders
The results from a SQL query can be written out to an external system using INSERT INTO
syntax.
INSERT INTO OrderResults
SELECT order_id
FROM Orders
WITH
WITH
provides a way to write auxiliary statements for use in a larger query.
These statements, which are often referred to as Common Table Expressions or CTEs, can be thought of as defining temporary views that exist just for one query.
WITH orders_with_total AS (
SELECT order_id, price + tax AS total
FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id
SELECT DISTINCT
If SELECT DISTINCT
is specified, all duplicate rows are removed from the result set (one row is kept from each group of duplicates).
SELECT DISTINCT id FROM Orders
For streaming queries, the required state for computing the query result might grow infinitely. State size depends on number of distinct rows. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details
Group Aggregations
Like most data systems, Apache Flink® supports aggregate functions; both built-in and user-defined. User-defined functions must be registered in a catalog before use.
An aggregate function computes a single result from multiple input rows.
For example, there are aggregates to compute the COUNT
, SUM
, AVG
(average), MAX
(maximum) and MIN
(minimum) over a set of rows.
SELECT COUNT(*) FROM Orders
It is important to understand that Flink runs continuous queries that never terminate.
Instead they update their result table according to the updates on its input tables.
So for the above query, Flink will output an updated count each time a new row is inserted into the Orders
table.
GROUP BY
Apache Flink® supports the standard GROUP BY
clause for aggregating data.
SELECT COUNT(*)
FROM Orders
GROUP BY order_id
For streaming queries, the required state for computing the query result might grow infinitely.
State size depends on the number of groups and number and type of aggregation functions.
For example MIN
/MAX
are heavy on state size while COUNT
is cheap.
You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size.
Note that this might affect the correctness of the query result.
See query configuration for details.
GROUP BY Window Aggregation
Group windows are defined in the GROUP BY
clause of a SQL query and provide a convenient way to group rows when the grouping clause contains a time attribute.
Just like queries with regular GROUP BY
clauses, queries with a group by window aggregation will compute a single result row per group.
Unlike other aggregations on continuous tables, group windows do not emit intermediate results but only a final result, the total aggregation at the end of the window. Moreover, group window aggregations purge all intermediate state when no longer needed.
Apache Flink® supports three types of group windows, which can be defined on either event or processing time attributes.
TUMBLE
Tumbling windows are of fixed length, and non-overlapping. The boundaries of tumbling windows are aligned to the Epoch. By aligned we mean that the window boundaries are the same for every key.
TUMBLE(time_attr, size_interval)
For example, the number of orders per hour, for each active user.
SELECT user, TUMBLE_START(order_time, INTERVAL '1' HOUR), COUNT(*)
FROM Orders
GROUP BY TUMBLE(order_time, INTERVAL '1' HOUR), user
HOP
Hop windows are just like tumbling windows, except they overlap. The boundaries of hop windows are aligned to the Epoch. By aligned we mean that the window boundaries are the same for every key.
HOP(time_attr, slide_interval, size_interval)
The below query returns the number of orders per hour, reported every 20 minutes, for each active user.
SELECT user, HOP_START(order_time, INTERVAL '20' MINUTE, INTERVAL '1' HOUR), COUNT(*)
FROM Orders
GROUP BY HOP(order_time, INTERVAL '20' MINUTE, INTERVAL '1' HOUR), user
It is also valid to have a size interval that is smaller than the slide interval.
The following query will, for each active user, report every hour the number or orders during the last 20 minutes of that hour:
SELECT user, HOP_START(order_time, INTERVAL '1' HOUR, INTERVAL '20' MINUTE), COUNT(*)
FROM Orders
GROUP BY HOP(order_time, INTERVAL '1' HOUR, INTERVAL '20' MINUTE), user
SESSION
Session windows do not have a fixed duration, but are characterized by a gap wherein events do not occur between sessions.
SESSION(time_attr, gap_interval)
The below query returns the number of orders, where each order occurred within 30 minutes of the last, for each user.
SELECT user, SESSION_START(order_time, INTERVAL '30' MINUTE), COUNT(*)
FROM Orders
GROUP BY SESSION(order_time, INTERVAL '30' MINUTE), user
Selecting Group Window Start and End Timestamps
The start and end timestamps of group windows can be selected with the following built-in functions.
These functions must be called in the SELECT
clause with exactly the same arguments as the group window function in the GROUP BY
clause.
Start
Start functions return the timestamp of the inclusive lower bound of the corresponding tumbling, hopping, or session window.
TUMBLE_START(time_attr, size_interval)
HOP_START(time_attr, size_interval, slide_interval)
SESSION_START(time_attr, gap_interval)
End
End functions return the timestamp of the exclusive upper bound of the corresponding tumbling, hopping, or session window.
TUMBLE_END(time_attr, size_interval)
HOP_END(time_attr, size_interval, slide_interval)
SESSION_END(time_attr, gap_interval)
The exclusive upper bound timestamp cannot be used as a time attribute in subsequent time-based operations, such as interval joins and group window or over window aggregations.
Rowtime
Rowtime functions return the timestamp of the inclusive upper bound of the corresponding tumbling, hopping, or session window.
TUMBLE_ROWTIME(time_attr, size_interval)
HOP_ROWTIME(time_attr, size_interval, slide_interval)
SESSION_ROWTIME(time_attr, gap_interval)
The resulting timestamp is an event time attribute that can be used in subsequent time-based operations such as interval joins and group window or over window aggregations. If it is not extracted, then the row does not have an event time attribute.
Proctime
Proctime functions return a processing time attribute that can be used in subsequent time-based operations such as interval joins and group window or over window aggregations. If it is not extracted, then the row does not have a processing time attribute.
TUMBLE_PROCTIME(time_attr, size_interval)
HOP_PROCTIME(time_attr, size_interval, slide_interval)
SESSION_PROCTIME(time_attr, gap_interval)
DISTINCT Aggregation
Distinct aggregates remove duplicate values before applying an aggregation function. The following example counts the number of distinct order_ids instead of the total number of rows in the Orders table.
SELECT COUNT(DISTINCT order_id) FROM Orders
For streaming queries, the required state for computing the query result might grow infinitely. State size is mostly depends on the number of distinct rows and the time that a group is maintained, short lived group by windows are not a problem. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.
GROUPING SETS
Grouping sets allow for more complex grouping operations than those describable by a standard GROUP BY
.
Rows are grouped separately by each specified grouping set and aggregates are computed for each group just as for simple GROUP BY
clauses.
SELECT supplier_id, rating, COUNT(*) AS total
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ())
Results
supplier_id | rating | total |
---|---|---|
'supplier1' | 3 | 1 |
'supplier1' | 4 | 1 |
'supplier1' | NULL | 2 |
'supplier2' | 3 | 1 |
'supplier2' | 4 | 1 |
'supplier1' | NULL | 2 |
NULL | NULL | 4 |
Each sublist of GROUPING SETS
may specify zero or more columns or expressions and is interpreted the same way as though it was used directly in the GROUP BY
clause.
An empty grouping set means that all rows are aggregated down to a single group, which is output even if no input rows were present.
References to the grouping columns or expressions are replaced by null values in result rows for grouping sets in which those columns do not appear.
For streaming queries, the required state for computing the query result might grow infinitely. State size depends on number of group sets and type of aggregation functions. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.
ROLLUP
ROLLUP
is a shorthand notation for specifying a common type of grouping set.
It represents the given list of expressions and all prefixes of the list, including the empty list.
For example, the following query is equivalent to the one above.
SELECT supplier_id, rating, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY ROLLUP (supplier_id, rating)
CUBE
CUBE
is a shorthand notation for specifying a common type of grouping set.
It represents the given list and all of its possible subsets - the power set.
For example, the following two queries are equivalent.
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY CUBE (supplier_id, rating, product_id)
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)
HAVING
HAVING
eliminates group rows that do not satisfy the condition.
HAVING
is different from WHERE
: WHERE
filters individual rows before the GROUP BY
while HAVING
filters group rows created by GROUP BY
.
Each column referenced in condition must unambiguously reference a grouping column unless it appears within an aggregate function.
SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50
The presence of HAVING
turns a query into a grouped query even if there is no GROUP BY
clause.
It is the same as what happens when the query contains aggregate functions but no GROUP BY
clause.
The query considers all selected rows to form a single group, and the SELECT
list and HAVING
clause can only reference table columns from within aggregate functions.
Such a query will emit a single row if the HAVING
condition is true, zero rows if it is not true.
OVER Window Aggregations
OVER
window aggregates compute an aggregated value for every input row over a range of ordered rows. In contrast to GROUP BY
aggregates, OVER
aggregates do not reduce the number of result rows to a single row for every group. Instead OVER
aggregates produce an aggregated value for every input row.
The following query computes for every order the sum of amounts of all orders for the same product that were received within one hour before the current order.
SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM Orders
The syntax for an OVER
window is summarized below.
SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
You can define multiple OVER
window aggregates in a SELECT
clause. However, the OVER
windows for all aggregates must be identical due to limitations in Apache Flink®.
The syntax limitations that are explained in this section do not apply to the special cases of Top-N and Deduplication queries.
ORDER BY
OVER
windows are defined on an ordered sequence of rows. Since tables do not have an inherent order, the ORDER BY
clause is mandatory. Flink currently only supports OVER
windows that are defined with an ascending time attributes order. Additional orderings are not supported.
PARTITION BY
OVER
windows can be defined on a partitioned table. In presence of a PARTITION BY
clause, the aggregate is computed for each input row only over the rows of its partition.
Range Definitions
The range definition specifies how many rows are included in the aggregate. The range is defined with a BETWEEN
clause that defines a lower and an upper boundary. All rows between these boundaries are included in the aggregate. Flink only supports CURRENT ROW
as the upper boundary.
There are two options to define the range, ROWS
intervals and RANGE
intervals.
RANGE intervals
A RANGE
interval is defined on the values of the ORDER BY
column, which is in case of Flink always a time attribute. The following RANGE
interval defines that all rows with a time attribute of at most 30 minutes less than the current row are included in the aggregate.
RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW
ROW intervals
A ROWS
interval is a count-based interval. It defines exactly how many rows are included in the aggregate. The following ROWS
interval defines that the 10 rows preceding the current row and the current row (so 11 rows in total) are included in the aggregate.
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
WINDOW
The WINDOW
clause can be used to define an OVER
window outside of the SELECT
clause. It can make queries more readable and also allows us to reuse the window definition for multiple aggregates.
SELECT order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM Orders
WINDOW w AS (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)
Joins
Apache Flink® supports a large variety of joins to combine and enrich tables.
By default, the order of joins is not optimized.
Tables are joined in the order in which they are specified in the FROM
clause.
You can tweak the performance of your join queries, by listing the tables with the lowest update frequency first and the tables with the highest update frequency last.
Make sure to specify tables in an order that does not yield a cross join (Cartesian product), which are not supported and would cause a query to fail.
INNER Equi-JOIN
Returns a simple Cartesian product restricted by the join condition. Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.
SELECT *
FROM Orders
INNER JOIN Product
ON Orders.product_id = Product.id
For streaming queries, the required state for computing the query result might grow infinitely depending on the number of distinct input rows of all input tables and intermediate join results. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.
OUTER Equi-JOIN
Returns all rows in the qualified Cartesian product (i.e., all combined rows that pass its join condition), plus one copy of each row in an outer table for which the join condition did not match with any row of the other table. Flink supports LEFT, RIGHT, and FULL outer joins. Currently, only equi-joins are supported, i.e., joins with at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.
SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id
SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id
SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id
For streaming queries, the required state for computing the query result might grow infinitely depending on the number of distinct input rows of all input tables and intermediate join results. You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size. Note that this might affect the correctness of the query result. See query configuration for details.
Interval Join
Returns a simple Cartesian product restricted by the join condition and a time constraint.
An interval join requires at least one equi-join predicate and a join condition that bounds the time on both sides.
Two appropriate range predicates can define such a condition (<, <=, >=, >
), a BETWEEN predicate, or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.
For example, this query will join all orders with their corresponding shipments if the order was shipped four hours after the order was received.
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time
The following predicates are examples of valid interval join conditions:
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
Lookup Join
A lookup join is typically used to enrich a table with data that is queried from an external system. The join requires one table to have a processing time attribute and the other table to be backed by a lookup source connector.
The following example shows the syntax to specify a lookup join.
-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
'table-name' = 'customers'
);
-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
In the example above, the Orders
table is enriched with data from the Customers
table which resides in a MySQL database. The FOR SYSTEM_TIME AS OF
clause with the subsequent processing time attribute ensures that each row of the Orders
table is joined with those Customers
rows that match the join predicate at the point in time when the Orders
row is processed by the join operator. It also prevents that the join result is updated when a joined Customer
row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above o.customer_id = c.id
.
Temporal Table Join
A temporal table join enriches a table with data from a versioned table at a specific point in time, either based on processing or event time semantics. Versioned tables are implicitly created for connectors or formats that ingest changelogs. They can also be explicitly created from a table with changelog semantics using a deduplication query.
The following example demonstrates how a versioned table can be used in a temporal table join. You can also find examples using deduplication queries in the Apache Flink® documentation.
CREATE TEMPORARY TABLE Orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
) WITH (/* … */);
CREATE TEMPORARY TABLE Rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
/* … */
);
SELECT
order.order_id,
order.order_time,
order.amount * rate.rate AS amount,
rate.currency
FROM Orders AS order, Rates FOR SYSTEM_TIME AS OF order.order_time rate
on order.currency = rate.currency;
Array Expansion
Returns a new row for each element in the given array.
Unnesting WITH ORDINALITY
is not yet supported.
SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
Table Function
Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. User-defined table functions must be registered before use.
INNER JOIN
The row of the left (outer) table is dropped, if its table function call returns an empty result.
SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res)
LEFT OUTER JOIN
If a table function call returns an empty result, the corresponding outer row is preserved, and the result padded with null values.
Currently, a left outer join against a lateral table requires a TRUE
literal in the ON
clause.
SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
ON TRUE
Set Operations
UNION ALL
Returns the union of multiple tables without deduplication.
SELECT *
FROM (
(SELECT user FROM Orders WHERE a % 2 = 0)
UNION ALL
(SELECT user FROM Orders WHERE b = 0)
)
IN
Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column, and the column must have the same data type as the expression.
SELECT user, amount
FROM Orders
WHERE product IN (
SELECT product FROM NewProducts
)
The optimizer rewrites the IN
condition into a join and group operation.
The required state for computing the query result might grow infinitely depending on the number of distinct input rows.
You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size.
Note that this might affect the correctness of the query result.
See query configuration for details.
EXISTS
Returns true if the sub-query returns at least one row. This operation is only supported if the operation can be rewritten in a join and group operation.
SELECT order_id
FROM Orders o
WHERE EXISTS (
SELECT * FROM NewProducts np WHERE np.product = o.product)
The optimizer rewrites the EXISTS
operation into a join and group operation.
The required state for computing the query result might grow infinitely depending on the number of distinct input rows.
You can provide a query configuration with an appropriate state time-to-live (TTL) to prevent excessive state size.
Note that this might affect the correctness of the query result.
See query configuration for details.
ORDER BY
The ORDER BY
clause causes the result rows to be sorted according to the specified expression(s).
If two rows are equal according to the leftmost expression, they are compared according to the next expression and so on.
If they are equal according to all specified expressions, they are returned in an implementation-dependent order.
The primary sort order of a table must be ascending on a time attribute.
All subsequent orders can be freely chosen.
SELECT *
FROM Orders
ORDER BY order_time, order_id
Top-N
Top-N queries return the N smallest or largest values as determined by some set of columns. Top-N queries are useful in cases where the need is to find the N bottom-most or the N top-most records from a table based on some condition.
You can express a Top-N query using a combination of the ROW_NUMBER
function defined on an OVER window and a WHERE
condition.
Group-wise Top-N queries can be defined by specifying an OVER
window with a PARTITION BY
clause.
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [ASC|DESC][, col2 [ASC|DESC]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
Parameter Specification
ROW_NUMBER()
: Assigns a unique, sequential number to each row, starting from one, according to the ordering of rows within the partition. Currently,ROW_NUMBER
is the only supportedOVER
window function.PARTITION BY col1[, col2...]
: Specifies the partition columns. Each partition will have a Top-N result.ORDER BY col1 [ASC|DESC][, col2 [ASC|DESC]...]
: Specifies the ordering columns. The ordering directions can be different on different columns and determines whether the query returns the N largest or smallest values.WHERE rownum <= N
: Therownum <= N
condition defines how many rows (per partition) are returned.[AND conditions]
: Arbitrary other conditions can only be combined withrownum <= N
usingAND
conjunction.
For example, you may want to find the top five products per category with the maximum number of sales in real-time.
CREATE TABLE StoreSales (
product_id BIGINT,
category STRING,
sales BIGINT
) WITH (...)
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
FROM StoreSales
) WHERE row_num <= 5
The above pattern must be followed exactly, otherwise the optimizer won't be able to translate the query.
Deduplication
Deduplication removes rows that duplicate over a set of columns, keeping only the one with the earliest or latest timestamp.
A scenario that often requires deduplication is an upstream ETL job, which does not feature end-to-end exactly-once guarantees; this may result in duplicate records being produced in the case of failure.
Because duplicate records will affect the correctness of downstream analytical jobs, for example with aggregation functions such as SUM
, COUNT
, deduplication is needed before further analysis.
You can write a query to remove duplicates using the ROW_NUMBER()
function.
Conceptually, deduplication is a special case of Top-N, in which the N is one, and records are ordered by the processing time or event time attribute.
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [ASC|DESC]) AS rownum
FROM table_name)
WHERE rownum = 1
Parameter Specification
ROW_NUMBER()
: Assigns an unique, sequential number to each row, starting with one.PARTITION BY col1[, col2...]
: Specifies the partition columns, i.e. the deduplicate key.ORDER BY time_attr [ASC|DESC]
: Specifies the ordering column, it must be a time attribute. Ordering byASC
means keeping the first row, ordering byDESC
means keeping the last row.WHERE rownum = 1
: Therownum = 1
condition defines that only the earliest (or latest) row per set of partition values is kept.
The above pattern must be followed exactly, otherwise the optimizer won't be able to translate the query.
The following examples show how to deduplicate the rows of an orders table and only keep last received row per order_id.
SELECT order_id, user, product, number
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num
FROM Orders)
WHERE row_num = 1
Pattern Detection
The MATCH_RECOGNIZE
clause is used to search for a pattern of events over a sorted table.
This clause enables you to define event patterns using regular expressions and aggregate methods to verify and extract values from the match.
The following example finds the highest price of a stock before it starts to go down.
It shows the types of complex patterns that can be described decoratively using the MATCH_RECOGNIZE
clause.
CREATE TABLE Ticker (
ticker_time TIMESTAMP(3),
stock BIGINT,
price DECIMAL,
WATERMARK FOR ticker_time AS ticker_time
) WITH (...)
SELECT
s,
p
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY stock
ORDER BY ticker_time
MEASURES
A.stock AS s,
A.price AS p
ONE ROW PER MATCH
PATTERN(A B* C)
DEFINE
B AS (price < PREV(price)),
C AS (price >= PREV(price))
)
MATCH_RECOGNIZE
has a matching output of ONE ROW PER MATCH
as default, which is the only matching available.
This means the match produces a single row result per match and does not return the rows that are matched.
PARTITION BY
The PARTITION BY
clause allows the match to be keyed and partitioned over a column name.
A match will happen over every unique key specified by the partition statement.
This enables a single query to be matched over all the keys and generate separate matches, one to every key.
ORDER BY
The ORDER BY
clause defines the order of rows on which the patterns are matched against.
The first column of the clause must be an ascending time attribute.
PATTERN
The PATTERN
clause defines the regular expression of events to be searched over the sorted table.
Every pattern is constructed from basic building blocks, called pattern variables, to which operators (quantifiers and other modifiers) can be applied.
The whole pattern must be enclosed in brackets.
Modifiers like +
and *
can be used to modify the frequency of a variable when matching events.
PATTERN (A B+ C* D)
The pattern on this example defines four pattern variables A
, B
, C
, and D
.
Variable A
should appear once, followed by B
one or more times, C
zero or more times, and finally D
exactly once.
Pattern Quantifiers
Pattern quantifiers are used to specify how a pattern is mapped in the sorted table, defining how many times a pattern variable needs to match to be valid. The following quantifiers are available:
Quantifier | Description |
---|---|
* | Zero or more times |
+ | One or more times |
? | Zero or one time |
{ n } | Exactly n times (n > 0) |
{ n, } | n or more times (n >= 0) |
{ n, m } | Between n and m times inclusive (0 <= n <= m, 0 < m) |
{ , m } | Between 0 and m times inclusive (m > 0) |
Patterns that can produce an empty match are not supported.
Examples of such patterns are PATTERN (A*)
, PATTERN (A? B*)
, PATTERN (A{0,} B{0,} C*)
, etc.
Greedy & Reluctant Quantifiers
Each quantifier can be either greedy (default behavior) or reluctant.
Greedy quantifiers try to match as many rows as possible while reluctant quantifiers try to match as few as possible.
A pattern is made reluctant by appending a ?
question mark to it.
Since the *
qualifier tells a pattern to match zero or more rows, A*? B
defines a pattern that matches
zero or more A
's followed by B
, but as few as needed. It might be none at all if a rows could match
both A
and B
.
MEASURES and DEFINE
The MEASURES
clause is similar to the SELECT
clause in a simple SQL query.
It is used to define the projected values from the match using aggregate methods.
For example, MAX(A.id) AS max_id
will output the max id value that was found over all events of a matched pattern.
The DEFINE
clause is similar to the WHERE
clause in a simple SQL query.
It specifies conditions that rows have to fulfill to be classified as a corresponding pattern variable.
If there is no condition defined for a pattern variable, it will match any row.
Pattern Navigation
The MEASURES
and DEFINE
clauses allow for navigating within the list of rows that (potentially) match a pattern.
A pattern variable reference allows a set of rows mapped to a particular pattern variable in the DEFINE
or MEASURES
clauses to be referenced.
For example, the expression A.price
describes a set of rows mapped so far to A
plus the current row if we try to match the current row to A
.
If an expression in the clause requires a single row (e.g., A.price
or A.price > 10
), it selects the last value belonging to the corresponding set.
If no pattern variable is specified (e.g., SUM(price)
), an expression references the default pattern variable *
which references all variables in the pattern.
It creates a list of all the rows mapped so far to any variable plus the current row.
Logical Offsets
Logical offsets enable navigation within the events that were mapped to a particular pattern variable. This can be expressed with two corresponding functions:
Offset functions | Description |
---|---|
LAST(variable.field, n) | Returns the value of the field from the event that was mapped to the n-th last element of the variable. The counting starts at the last element mapped. |
FIRST(variable.field, n) | Returns the value of the field from the event that was mapped to the n-th element of the variable. The counting starts at the first element mapped. |
Aggregations
Both build-in and user defined aggregation functions can be used in the MEASURES
and DEFINE
clauses.
Aggregate functions are applied to each subset of rows mapped to a match.
Aggregations can be applied to expressions, but only if they reference a single pattern variable.
Thus, SUM(A.price * A.tax)
is a valid one, but AVG(A.price * B.tax)
is not.
Time Constraint
The WITHIN
clause enforces that a pattern matches within a given time interval.
The clause is optionally defined after the PATTERN
clause and takes an INTERVAL DAY TO MONTH value.
It is generally encouraged to use the WITHIN
clause as it helps Apache Flink® with efficient memory management.
The state of the underlying pattern matching state machine is pruned once the threshold is reached.
PATTERN (A B) WITHIN INTERVAL '1' HOUR
After Match Strategy
The AFTER MATCH SKIP
clause specifies where to start a new matching procedure after a complete match was found.
There are four different strategies:
SKIP PAST LAST ROW
- resumes the pattern matching at the next row after the last row of the current match.SKIP TO NEXT ROW
- continues searching for a new match starting at the next row after the starting row of the match.SKIP TO LAST variable
- resumes the pattern matching at the last row that is mapped to the specified pattern variable.SKIP TO FIRST variable
- resumes the pattern matching at the first row that is mapped to the specified pattern variable.
This is also a way to specify how many matches a single event can belong to.
For example, with the SKIP PAST LAST ROW
strategy, every row can belong to at most one match.
Time Attributes
In order to apply some subsequent operations on top of the MATCH_RECOGNIZE
results it might be required to use time attributes.
To select those there are available two functions:
MATCH_ROWTIME()
MATCH_ROWTIME()
returns the event time attribute the last row that was mapped to the given pattern.
MATCH_PROCTIME()
MATCH_PROCTIME()
returns a processing time attribute that can be used in subsequent operations.
Example
Lets consider the example shown at the top of this section.
The query is reading from a table called Ticker
that contains prices of stocks at a particular point in time and has the following schema:
CREATE TABLE Ticker (
symbol STRING,
price DECIMAL(32, 2),
tax BIGINT,
ticker_time TIMESTAMP(3),
WATERMARK FOR ticker_time AS ticker_time - INTERVAL '5' SECOND
) WITH (...)
The task is now to find periods of a constantly decreasing price of a single ticker. For this, one could write a query like:
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY ticker_time
MEASURES
START_ROW.ticker_time AS start_tstamp,
LAST(PRICE_DOWN.ticker_time) AS bottom_tstamp,
LAST(PRICE_UP.ticker_time) AS end_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO LAST PRICE_UP
PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)
DEFINE
PRICE_DOWN AS
(LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
PRICE_UP AS
PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
) MR;
The query partitions the Ticker
table by the symbol column and orders it by the ticker_time time attribute.
The PATTERN
clause specifies that we are interested in a pattern with a starting event START_ROW
that is followed by one or more PRICE_DOWN
events and concluded with a PRICE_UP
event.
If such a pattern can be found, the next pattern match will be sought at the last PRICE_UP
event as indicated by the AFTER MATCH SKIP TO LAST
clause.
The DEFINE
clause specifies the conditions that need to be met for a PRICE_DOWN
and PRICE_UP
event.
Although the START_ROW
pattern variable is not present it has an implicit condition that is evaluated always as TRUE
.
A pattern variable PRICE_DOWN
is defined as a row with a price that is smaller than the price of the last row that met the PRICE_DOWN
condition.
For the initial case or when there is no last row that met the PRICE_DOWN
condition, the price of the row should be smaller than the price of the preceding row in the pattern (referenced by START_ROW
).
A pattern variable PRICE_UP
is defined as a row with a price that is larger than the price of the last row that met the PRICE_DOWN
condition.
This query produces a summary row for each period in which the price of a stock was continuously decreasing.
The exact representation of the output rows is defined in the MEASURES
part of the query.
It will return three columns, the time prices started going down, the time of the lowest price, and the time prices started going back up again.