Dimension Table Joins
Dimension table joins allow you to enrich a data stream with information from an external lookup table (a dimension table). Think of it like looking up product details from a catalog for each incoming order in a real-time stream.
In Flink SQL, you join a streaming table (like Orders) with a dimension table (like Products) using the FOR SYSTEM_TIME AS OF syntax. This special syntax tells Flink that Products is not a regular streaming table but an external table whose contents should be looked up based on the processing time of the stream event.
The dimension table itself is usually defined using a connector that supports lookup capabilities, such as the JDBC connector (for databases like PostgreSQL or MySQL) or the HBase connector.
Performance and Caching
To improve performance and reduce load on the external system, you can enable caching for the dimension table in the WITH options.
-
NONE: (Default) No data is cached. Every lookup query goes directly to the external database, which can cause high load.
-
LRU (Least Recently Used): Caches a subset of data. If a record is not in the cache, Flink queries the external database and stores the result in the cache. This is a good balance for large tables.
-
lookup.cache.max-rows: The maximum number of rows to keep in the cache. -
lookup.cache.ttl: The time-to-live for a cached record (e.g., 1h, 60s).
-
-
ALL: Caches the entire dimension table in memory when the job starts. This provides the best performance for lookups but is only suitable for small tables where you can fit all data in memory.
For connectors that support it (like JDBC), you can also enable asynchronous lookups ('lookup.async' = 'true') to further increase throughput.
Limitations and Considerations
To use dimension table joins effectively, be aware of the following constraints:
-
Join Types: Dimension table joins only support
INNER JOINandLEFT JOIN.RIGHT JOINandFULL JOINare not supported. -
Processing Time: The join is based on the snapshot of the dimension table at the moment of processing. If data in the dimension table changes, those changes will only be reflected in stream events that arrive after the change.
-
Primary Key: The join condition must typically involve an equi-join on a unique key (usually the
PRIMARY KEY) of the dimension table.
Common Use Cases
To add reference data or contexual details to streaming events, use dimension table joins for:
-
Enriching Orders: Adding product names, prices, or categories to an order stream based on
product_id. -
Augmenting User Activity: Joining user IDs from a clickstream with user profile information (like name, location, or segment) from a database.
-
Adding Context to Sensor Data: Combining sensor readings (
sensor_id) with sensor metadata (like location or calibration details) stored in an external table.
Syntax and Example
To join a streaming table with a dimension table, use a LEFT JOIN or INNER JOIN with FOR SYSTEM_TIME AS OF.
-- Define the streaming source (e.g., Kafka)
CREATE TABLE Orders (
order_id BIGINT,
product_id INT,
order_time TIMESTAMP(3),
-- Define the processing time attribute
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
... -- Other Kafka options
);
-- Define the dimension table (e.g., PostgreSQL)
CREATE TABLE Products (
product_id INT,
product_name STRING,
category STRING,
PRIMARY KEY (product_id) NOT ENFORCED -- Lookup key must be primary key
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://host:port/database',
'table-name'= 'product_details',
'lookup.cache.max-rows' = '5000', -- Enable caching
'lookup.cache.ttl' = '1h'
);
-- Define a sink for the enriched results
CREATE TABLE EnrichedOrders (
order_id BIGINT,
order_time TIMESTAMP(3),
product_id INT,
product_name STRING,
category STRING
) WITH (
'connector' = 'print' -- Or another sink like Kafka
);
-- The dimension table join query
INSERT INTO EnrichedOrders
SELECT
o.order_id,
o.order_time,
o.product_id,
p.product_name, -- Field from the dimension table
p.category -- Field from the dimension table
FROM Orders AS o
-- Join based on processing time
LEFT JOIN Products FOR SYSTEM_TIME AS OF o.proctime AS p
-- Join condition uses the dimension table's primary key
ON o.product_id = p.product_id;