Materialized Tables
Supported Version: VERA Engine 4.1 or later. Note that incremental updates require VERA Engine 4.3 or later.
Overview
Materialized tables in Ververica Cloud are stateful representations of data that the system persists and maintains automatically during query execution. They provide a unified way to define data pipelines, which abstracts the complexity of managing separate batch and streaming jobs.
When you define a materialized table, the system automatically manages the underlying Flink jobs required to keep the data fresh based on your defined interval.
Key Characteristics
- Automated management: The system handles the refresh logic (streaming or batch) based on the
FRESHNESSdefinition. - State persistence: Data is persisted in the storage backend (for example, Paimon on S3), which allows for historical queries and efficient lookups.
- Unified architecture: Simplifies the "Lambda Architecture" by allowing a single definition to handle both historical backfill and real-time updates.
Prerequisites
To use materialized tables, ensure your environment meets the following requirements:
- Engine version: Your workspace must use VERA Engine 4.1 or later. Note that incremental updates require VERA Engine 4.3 or later.
- Catalog: You must configure an Apache Paimon catalog.
- Storage access: The platform must have read and write access to the object storage (such as S3 or Azure Blob Storage) used by the catalog.
Use Cases
| Use Case | Benefits |
|---|---|
| Stream-to-table (temporal) joins | Join a stream of events with a slowly changing dimension table materialized in state for efficient lookups. |
| Windowed aggregations | Calculate and persist rolling metrics, such as counts, sums, or averages, over time windows. |
| Deduplication | Maintain a stateful index of unique keys to ensure only the first event per key is processed. |
| CDC handling | Consume database changelogs and materialize the current state of a record. |
| Real-time dashboards | Persist aggregated KPIs in a queryable format for low-latency access. |
Update Modes
A materialized table supports three update modes:
- Stream update: The table updates continuously as new data arrives.
- Full batch update: The engine calculates data for the entire table or partition and overwrites the existing data.
- Incremental batch update: The engine calculates only the data that has changed since the last update and merges it into the materialized table.
Freshness Behavior
The FRESHNESS value determines whether the table updates in stream or batch mode:
- Stream mode: Used when freshness is less than 30 minutes.
- Batch mode: Used when freshness is 30 minutes or more.
In batch mode, the engine automatically determines whether to perform a full or an incremental update. The engine prioritizes incremental updates and performs a full update only when the query or table does not support incremental processing.
Incremental Updates
Incremental updates improve efficiency by processing only changed data.
Conditions for Incremental Updates
The engine uses incremental updates only if the materialized table meets the following conditions:
- The
partition.fields.#.date-formatterparameter is not configured in the materialized table definition. - The source table does not have a primary key.
- The query statement supports incremental updates.
Supported SQL Patterns
The following table describes the SQL features supported for incremental updates:
| Clause | Support and Limitations |
|---|---|
SELECT | Supports selecting columns and scalar function expressions, including user-defined functions (UDFs). Aggregate functions are not supported. |
FROM | Supports table names or subqueries. |
WITH | Supports common table expressions (CTEs). |
WHERE | Supports filter conditions with scalar function expressions. Subqueries, such as WHERE [NOT] EXISTS or WHERE [NOT] IN, are not supported. |
UNION | Only UNION ALL is supported. |
JOIN | INNER JOIN is supported, but it still reads the full data from both source tables. LEFT/RIGHT/FULL OUTER JOIN is not supported, except for the lateral and lookup join cases below. |
LATERAL JOIN | [LEFT OUTER] JOIN LATERAL with table function expressions is supported. |
Lookup Join | Only A [LEFT OUTER] JOIN B FOR SYSTEM_TIME AS OF PROCTIME() is supported. |
GROUP BY | Not supported. |
JOINs without the JOIN keyword, such as SELECT * FROM a, b WHERE a.id = b.id, are also supported.
Examples
Example 1: Process Data Using a Scalar Function
This example uses a scalar function to transform order data.
CREATE MATERIALIZED TABLE mt_shipped_orders (
PRIMARY KEY (order_id) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
SELECT
order_id,
COALESCE(customer_id, 'Unknown') AS customer_id,
CAST(order_amount AS DECIMAL(10, 2)) AS order_amount,
CASE
WHEN status = 'shipped' THEN 'Completed'
WHEN status = 'pending' THEN 'In Progress'
ELSE 'Unknown'
END AS order_status,
DATE_FORMAT(order_ts, 'yyyyMMdd') AS order_date,
process_notes(notes) AS notes
FROM
orders
WHERE
status = 'shipped';
Example 2: Enrich Data Using Lateral and Lookup Joins
This example uses a lateral join to split tags and a lookup join to enrich order data with product information.
CREATE MATERIALIZED TABLE mt_enriched_orders (
PRIMARY KEY (order_id, order_tag) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
WITH o AS (
SELECT
order_id,
product_id,
quantity,
proc_time,
e.tag AS order_tag
FROM
orders,
LATERAL TABLE(split_tags(tags, ',')) AS e(tag))
SELECT
o.order_id,
o.product_id,
p.product_name,
p.category,
o.quantity,
p.price,
o.quantity * p.price AS total_amount,
order_tag
FROM o
LEFT JOIN
product_info FOR SYSTEM_TIME AS OF PROCTIME() AS p
ON
o.product_id = p.product_id;
Create and Use Materialized Tables
This workflow demonstrates how to build a data pipeline using Paimon and materialized tables. In this scenario, you create ODS (Operational Data Store) tables for user logs and product dimensions, and then build DWD (Data Warehouse Detail) and DWS (Data Warehouse Service) materialized tables to analyze the data.
1. Create a Paimon Catalog
Create a Paimon catalog backed by object storage (S3).
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'metastore' = 'filesystem',
'warehouse' = 's3a://your-bucket-name/paimon' -- Replace with your S3 bucket
);
USE CATALOG paimon;
2. Create ODS Tables
Create the database and the ODS tables for user logs and products.
CREATE DATABASE IF NOT EXISTS mt;
USE mt;
-- Create User Log table
CREATE TABLE ods_user_log (
item_id INT NOT NULL,
user_id INT NOT NULL,
vtime TIMESTAMP(6),
ds VARCHAR(10)
)
PARTITIONED BY(ds)
WITH (
'bucket' = '4', -- Set the number of buckets to 4.
'bucket-key' = 'item_id' -- Specify the bucket key column.
);
-- Create Product Dimension table
CREATE TABLE ods_dim_product (
item_id INT NOT NULL,
title VARCHAR(255),
pict_url VARCHAR(255),
brand_id INT,
seller_id INT,
PRIMARY KEY(item_id) NOT ENFORCED
) WITH (
'bucket' = '4',
'bucket-key' = 'item_id'
);
3. Generate and Load Data
Use the faker connector to generate sample data and load it into the ODS tables.
-- Generate user log data
CREATE TEMPORARY TABLE `user_log` (
item_id INT,
user_id INT,
vtime TIMESTAMP,
ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd')
) WITH (
'connector' = 'faker',
'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',
'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}',
'fields.vtime.expression'='#{date.past ''5'',''HOURS''}',
'rows-per-second' = '3'
);
-- Generate product data
CREATE TEMPORARY TABLE `dim_product` (
item_id INT NOT NULL,
title VARCHAR(255),
pict_url VARCHAR(255),
brand_id INT,
seller_id INT,
PRIMARY KEY(item_id) NOT ENFORCED
) WITH (
'connector' = 'faker',
'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',
'fields.title.expression'='#{book.title}',
'fields.pict_url.expression'='#{internet.domainName}',
'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}',
'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}',
'rows-per-second' = '3'
);
-- Insert data into ODS tables
BEGIN STATEMENT SET;
INSERT INTO ods_user_log
SELECT
item_id,
user_id,
vtime,
CAST(ds AS VARCHAR(10)) AS ds
FROM `user_log`;
INSERT INTO ods_dim_product
SELECT
item_id,
title,
pict_url,
brand_id,
seller_id
FROM `dim_product`;
END;
4. Verify ODS Data
Check that the data has been successfully loaded into the Paimon tables.
SELECT * FROM ods_dim_product LIMIT 10;
SELECT * FROM ods_user_log LIMIT 10;
5. Create Materialized Tables
Create materialized tables to process and aggregate the data.
DWD Table: dwd_user_log_product
This table joins the user log stream with the product dimension table.
CREATE MATERIALIZED TABLE dwd_user_log_product(
PRIMARY KEY (item_id) NOT ENFORCED
)
PARTITIONED BY(ds)
WITH (
'partition.fields.ds.date-formatter' = 'yyyyMMdd'
)
FRESHNESS = INTERVAL '1' HOUR -- Refresh data every hour.
AS SELECT
l.ds,
l.item_id,
l.user_id,
l.vtime,
r.brand_id,
r.seller_id
FROM ods_user_log l INNER JOIN ods_dim_product r
ON l.item_id = r.item_id;
DWS Table: dws_overall
This table aggregates daily pageviews (PV) and unique visitors (UV).
CREATE MATERIALIZED TABLE dws_overall(
PRIMARY KEY(ds, hh) NOT ENFORCED
)
PARTITIONED BY(ds)
WITH (
'partition.fields.ds.date-formatter' = 'yyyyMMdd'
)
FRESHNESS = INTERVAL '1' HOUR -- Refresh data every hour.
AS SELECT
ds,
COALESCE(hh, 'day') AS hh,
count(*) AS pv,
count(distinct user_id) AS uv
FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id
FROM dwd_user_log_product) tmp
GROUP BY GROUPING SETS(ds, (ds, hh));
6. Start the Materialized Table Jobs
-
Navigate to the Data Lineage view in the console.
-
Locate your new materialized tables (
dwd_user_log_productanddws_overall). -
Click Start on the materialized table nodes.
You can click Details on the node to verify the status and view the latest job information. If the freshness is greater than 30 minutes, a corresponding workflow is also created.
7. Backfill or Trigger Updates
To manually refresh data or backfill historical partitions:
- In the Data Lineage pane, select the materialized table node (for example,
dwd_user_log_product). - In the lower-right corner, click Trigger Update.
- In the dialog box, enter the partition value in the
dsfield (for example, today's date like20251126). - Select the Cascade checkbox to automatically update downstream materialized tables (like
dws_overall). - Click Confirm.
A new workflow instance is created to handle the update.
8. Change Freshness
You can adjust the freshness interval to switch between batch and streaming modes. For example, lowering the freshness to less than 30 minutes switches the job to streaming mode.
-
In the Data Lineage view, select the materialized table.
-
Click Stop to stop the current job.
-
Modify the table definition to set a lower freshness, such as
FRESHNESS = INTERVAL '2' MINUTE. -
Click Start to restart the table.
The table now runs as a streaming job.
9. Query Materialized Tables
You can query the materialized tables directly to verify the results.
SELECT * FROM dws_overall ORDER BY hh LIMIT 10;
Create a Periodic Backfill Workflow
You can create a workflow to schedule periodic backfills for your materialized tables. This allows you to automate data refreshes at specific intervals.
-
Log on to the Ververica Cloud console.
-
Navigate to Operation Center > Workflows.
-
Click Create Workflow.
-
In the configuration panel, set the following parameters:
- Workflow Name: Enter a unique name for the workflow.
- Description: Enter a description for the workflow.
- Scheduling Type: Select Periodic Scheduling.
- Scheduling Rule: Set the schedule using a Cron expression (for example,
0 */5 * * * ?to run every 5 minutes). - Scheduling Start Time: Set a valid future start time.
- Resource Queue: Select the resource queue for the workflow.
-
Click Create.
-
In the workflow editor canvas, click the initial node (or add a new Materialized Table node) to edit it:
- Job: Select the materialized table job you want to schedule (for example,
dwd_user_log_product). - Node Name: Enter a descriptive name.
- Job: Select the materialized table job you want to schedule (for example,
-
Click Save in the node editor panel.
-
Click Save in the upper-right corner of the workflow page.
-
In the Workflows list, toggle the Status to Enabled (or click Enable) to activate the schedule.
The workflow will now execute automatically according to the defined schedule.
Limitations and Known Issues
Be aware of the following behaviors in the current release:
- Navigation to job details: In the UI, clicking Materialized Table > Details > Job might lead to a "Not Found" page because the filter might fail to locate the resource context.
- Missing SQL in job panel: When you view the Latest Job panel in Ververica Platform, the underlying SQL that generated the materialized table does not currently display.
- Workflow list navigation: The workflow list view does not currently support navigating to a complete list of all scheduled jobs associated with a specific materialized table.
- Workflow detail view: Navigating to materialized table details from the Workflow view might result in a "Not Found" error.