Workflow Scheduler
The Workflow Scheduler bridges the gap between streaming and batch processing. It allows you to orchestrate, schedule, and deploy batch workflows directly within Ververica Platform, providing an integrated experience for the Streaming Lakehouse.
This feature enables you to implement "near-real-time" data processing with frequencies as high as every 10 minutes. This offers a cost-effective alternative to continuous real-time streaming for use cases that require timeliness without the full expense of real-time systems. The experience is designed to mirror declarative ETL processes, similar to Databricks Delta Live Tables (DLT), but with lightweight scheduling.
Feature Overview
- Supported Engine: VERA 4.1 and later.
- Available On: Managed Service and BYOC deployments.
- Key Capabilities:
- Visual Orchestration: Design Directed Acyclic Graphs (DAGs) by connecting independent batch deployments.
- Flexible Scheduling: Trigger workflows manually for backfilling or set periodic schedules for regular processing.
- Versioning: Track workflow history and roll back to previous configurations with a single click.
- Unified Experience: Manage stream and batch workloads in a single interface.
Integration and Use Cases
Workflow Scheduler is a key component of the Streaming Lakehouse, integrating seamlessly with Lakehouse formats like Apache Paimon and object storage (S3, OSS, Azure Blob Storage).
Common use cases include:
- Layered Data Warehousing: Periodically cleansing and transforming raw data (ODS) into detailed layers (DWD) and service layers (DWS).
- Cost Optimization: Downgrading from full real-time to near-real-time to balance cost and scale.
- Data Backfilling: Manually triggering runs to process historical data.
Quickstart: Building a Paimon Data Warehouse
This tutorial demonstrates how to build a layered data warehouse (ODS → DWD → DWS) using Apache Paimon and S3. You will create batch jobs to cleanse and aggregate e-commerce data and orchestrate them into a workflow.
Prerequisites
- A Ververica Platform workspace running VERA 4.1 or later.
- A private connection configured for your storage provider (see Amazon S3 setup).
- An S3 bucket for the Paimon warehouse (e.g.,
s3a://your-bucket-name/paimon).
1. Initialize the Catalog and ODS Layer
In the SQL Editor, run the following commands to create the Paimon catalog and the Operational Data Store (ODS) tables.
-- Create Paimon Catalog
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'metastore' = 'filesystem',
'warehouse' = 's3a://<YOUR_BUCKET_NAME>/paimon'
);
CREATE DATABASE IF NOT EXISTS `paimon`.`order_dw`;
USE `paimon`.`order_dw`;
-- Create ODS Tables (Raw Data)
CREATE TABLE orders (
order_id BIGINT,
user_id STRING,
shop_id BIGINT,
product_id BIGINT,
buy_fee BIGINT,
create_time TIMESTAMP,
update_time TIMESTAMP,
state INT
);
CREATE TABLE orders_pay (
pay_id BIGINT,
order_id BIGINT,
pay_platform INT,
create_time TIMESTAMP
);
CREATE TABLE product_catalog (
product_id BIGINT,
catalog_name STRING
);
-- Create DWD Table (Detail Layer)
CREATE TABLE dwd_orders (
order_id BIGINT,
order_user_id STRING,
order_shop_id BIGINT,
order_product_id BIGINT,
order_product_catalog_name STRING,
order_fee BIGINT,
order_create_time TIMESTAMP,
order_update_time TIMESTAMP,
order_state INT,
pay_id BIGINT,
pay_platform INT COMMENT 'platform 0: phone, 1: pc',
pay_create_time TIMESTAMP
) WITH (
'sink.parallelism' = '2'
);
-- Create DWS Tables (Service/Aggregation Layer)
CREATE TABLE dws_users (
user_id STRING, ds STRING,
total_fee BIGINT COMMENT 'Total payment amount completed on current day'
) WITH (
'sink.parallelism' = '2'
);
CREATE TABLE dws_shops (
shop_id BIGINT, ds STRING,
total_fee BIGINT COMMENT 'Total payment amount completed on current day'
) WITH (
'sink.parallelism' = '2'
);
2. Seed Initial Data
To populate the ODS tables with sample data, create a new SQL draft and click Deploy.
The only way to perform INSERT INTO ... VALUES statements currently is by using Deploy. Using Run or Debug in the SQL Editor might not process these statements correctly.
BEGIN STATEMENT SET;
INSERT INTO paimon.order_dw.orders VALUES
(100001, 'user_001', 12345, 1, 5000, TO_TIMESTAMP('2023-02-15 16:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
(100002, 'user_002', 12346, 2, 4000, TO_TIMESTAMP('2023-02-15 15:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
(100003, 'user_003', 12347, 3, 3000, TO_TIMESTAMP('2023-02-15 14:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
(100004, 'user_001', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 13:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
(100005, 'user_002', 12348, 5, 1000, TO_TIMESTAMP('2023-02-15 12:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
(100006, 'user_001', 12348, 1, 1000, TO_TIMESTAMP('2023-02-15 11:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
(100007, 'user_003', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 10:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1);
INSERT INTO paimon.order_dw.orders_pay VALUES
(2001, 100001, 1, TO_TIMESTAMP('2023-02-15 17:40:56')),
(2002, 100002, 1, TO_TIMESTAMP('2023-02-15 17:40:56')),
(2003, 100003, 0, TO_TIMESTAMP('2023-02-15 17:40:56')),
(2004, 100004, 0, TO_TIMESTAMP('2023-02-15 17:40:56')),
(2005, 100005, 0, TO_TIMESTAMP('2023-02-15 18:40:56')),
(2006, 100006, 0, TO_TIMESTAMP('2023-02-15 18:40:56')),
(2007, 100007, 0, TO_TIMESTAMP('2023-02-15 18:40:56'));
INSERT INTO paimon.order_dw.product_catalog VALUES
(1, 'phone_aaa'), (2, 'phone_bbb'), (3, 'phone_ccc'), (4, 'phone_ddd'), (5, 'phone_eee');
END;
3. Create Batch Deployments
Create three separate Batch drafts for the processing logic. Save/deploy them as drafts so they are available for the workflow.
Job A: dwd_orders (ODS → DWD)
INSERT OVERWRITE paimon.order_dw.dwd_orders
SELECT
o.order_id, o.user_id, o.shop_id, o.product_id, c.catalog_name, o.buy_fee,
o.create_time, o.update_time, o.state, p.pay_id, p.pay_platform, p.create_time
FROM
paimon.order_dw.orders as o,
paimon.order_dw.product_catalog as c,
paimon.order_dw.orders_pay as p
WHERE o.product_id = c.product_id AND o.order_id = p.order_id;
Job B: dws_shops (DWD → DWS)
INSERT OVERWRITE paimon.order_dw.dws_shops
SELECT
order_shop_id,
DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds,
SUM(order_fee) as total_fee
FROM paimon.order_dw.dwd_orders
WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL
GROUP BY order_shop_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');
Job C: dws_users (DWD → DWS)
INSERT OVERWRITE paimon.order_dw.dws_users
SELECT
order_user_id,
DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds,
SUM(order_fee) as total_fee
FROM paimon.order_dw.dwd_orders
WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL
GROUP BY order_user_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');
4. Orchestrate the Workflow
- Go to Workflows and click Create Workflow.
- Complete the Create Workflow dialog:
- Name: Enter a unique name for the workflow.
- Description: (Optional) Provide a description of the workflow.
- Scheduling type: Select one of the following:
- Manual Scheduling: Manually run the workflow by clicking Run. Use this for temporary tests or immediate processing.
- Periodic Scheduling: The workflow is triggered based on scheduler rules. You can run it at regular intervals, such as by the minute, hour, or day.
importantYou must select Periodic Scheduling for workflows that create materialized table nodes.
- Failure Retry Times: Specify the number of times to retry a failed node. By default, failed nodes are not retried.
- Failure Notification Email: Enter the default email address for notifications if a workflow node fails.
- Resource Queue: Select the deployment target for the workflow. This setting applies to all nodes by default.
noteThis configuration does not change the deployment target of corresponding deployed batch jobs.
- Tags: (Optional) Set a Label key and a Label value for the workflow.
- Click Create.
- Drag your three deployments onto the canvas.
- Connect the nodes to define dependencies:
- Connect
dwd_orderstodws_shops. - Connect
dwd_orderstodws_users.
- Connect
- Click Run to execute the workflow manually.
Managing Workflows
Scheduling
In the workflow Configuration tab, you can enable Periodic Scheduling. You can define standard Cron expressions or select simple intervals (e.g., "Every 10 minutes").
Rollback
If you modify a workflow (e.g., add a node or change a schedule), the platform creates a new version. To revert to a previous state:
- Go to the Versions tab.
- Select a previous version.
- Click Rollback.
Monitoring
The Instance History tab provides a log of all executions. You can click a specific run to see the real-time status of individual nodes.
Limitations & Known Issues
- Multi-Statement Execution: The scheduler does not currently support scripts with multiple independent SQL statements. You must use
BEGIN STATEMENT SETfor multipleINSERTcommands or split logic into separate jobs. - Debug Mode:
- No Data Writes: Running a job in Debug mode does not write results to the sink. You must Deploy the job to persist data.
- Debug Selection: Selecting specific statements for debugging might not work in all cases.
- UI and Error Reporting:
- Error Details: Non-validation errors might not appear in the UI panel. If a workflow fails without a clear error message, check the browser Network Panel for details.
- Diagram Visibility: The Diagrams tab might remain empty until the workflow has been executed at least once.
- Lifecycle Management:
- Queue Selection: If only the default queue is available, you might not be prompted to select a queue during execution.
- Deletion: Deleting a workflow with
PENDINGorWAITINGruns might succeed in the UI but fail to cancel the runs in the background. You should cancel individual runs manually before deletion.