Workflow Scheduler
On this page
The Workflow Scheduler bridges the gap between streaming and batch processing. It allows you to orchestrate, schedule, and deploy batch workflows directly within Ververica Platform, providing an integrated experience for the Streaming Lakehouse.
This feature enables you to implement "near-real-time" data processing with frequencies as high as every 10 minutes. This offers a cost-effective alternative to continuous real-time streaming for use cases that require timeliness without the full expense of real-time systems. The experience is designed to mirror declarative ETL processes, similar to Databricks Delta Live Tables (DLT), but with lightweight scheduling.
Feature Overview
- Supported Engine: VERA 4.1 and later.
- Available On: Managed Service and BYOC deployments.
- Key Capabilities:
- Visual Orchestration: Design Directed Acyclic Graphs (DAGs) by connecting independent batch deployments.
- Flexible Scheduling: Trigger workflows manually for backfilling or set periodic schedules for regular processing.
- Versioning: Track workflow history and roll back to previous configurations with a single click.
- Unified Experience: Manage stream and batch workloads in a single interface.
Integration and Use Cases
Workflow Scheduler is a key component of the Streaming Lakehouse, integrating seamlessly with Lakehouse formats like Apache Paimon and object storage (S3, OSS, Azure Blob Storage).
Common use cases include:
- Layered Data Warehousing: Periodically cleansing and transforming raw data (ODS) into detailed layers (DWD) and service layers (DWS).
- Cost Optimization: Downgrading from full real-time to near-real-time to balance cost and scale.
- Data Backfilling: Manually triggering runs to process historical data.
Quickstart: Building a Paimon Data Warehouse
This tutorial demonstrates how to build a layered data warehouse (ODS → DWD → DWS) using Apache Paimon and S3. You will create batch jobs to cleanse and aggregate e-commerce data and orchestrate them into a workflow.
Prerequisites
- A Ververica Platform workspace running VERA 4.1 or later.
- An S3 bucket for the Paimon warehouse (e.g.,
s3a://your-bucket-name/paimon).
1. Initialize the Catalog and ODS Layer
In the SQL Editor, run the following commands to create the Paimon catalog and the Operational Data Store (ODS) tables.
1-- Create Paimon Catalog
2CREATE CATALOG paimon WITH (
3 'type' = 'paimon',
4 'metastore' = 'filesystem',
5 'warehouse' = 's3a://<YOUR_BUCKET_NAME>/paimon'
6);
7
8CREATE DATABASE IF NOT EXISTS `paimon`.`order_dw`;
9USE `paimon`.`order_dw`;
10
11-- Create ODS Tables (Raw Data)
12CREATE TABLE orders (
13 order_id BIGINT,
14 user_id STRING,
15 shop_id BIGINT,
16 product_id BIGINT,
17 buy_fee BIGINT,
18 create_time TIMESTAMP,
19 update_time TIMESTAMP,
20 state INT
21);
22
23CREATE TABLE orders_pay (
24 pay_id BIGINT,
25 order_id BIGINT,
26 pay_platform INT,
27 create_time TIMESTAMP
28);
29
30CREATE TABLE product_catalog (
31 product_id BIGINT,
32 catalog_name STRING
33);
34
35-- Create DWD Table (Detail Layer)
36CREATE TABLE dwd_orders (
37 order_id BIGINT,
38 order_user_id STRING,
39 order_shop_id BIGINT,
40 order_product_id BIGINT,
41 order_product_catalog_name STRING,
42 order_fee BIGINT,
43 order_create_time TIMESTAMP,
44 order_update_time TIMESTAMP,
45 order_state INT,
46 pay_id BIGINT,
47 pay_platform INT COMMENT 'platform 0: phone, 1: pc',
48 pay_create_time TIMESTAMP
49) WITH (
50 'sink.parallelism' = '2'
51);
52
53-- Create DWS Tables (Service/Aggregation Layer)
54CREATE TABLE dws_users (
55 user_id STRING, ds STRING,
56 total_fee BIGINT COMMENT 'Total payment amount completed on current day'
57) WITH (
58 'sink.parallelism' = '2'
59);
60
61CREATE TABLE dws_shops (
62 shop_id BIGINT, ds STRING,
63 total_fee BIGINT COMMENT 'Total payment amount completed on current day'
64) WITH (
65 'sink.parallelism' = '2'
66);2. Seed Initial Data
To populate the ODS tables with sample data, create a new SQL draft and click Deploy.
The only way to perform INSERT INTO ... VALUES statements currently is by using Deploy. Using Run or Debug in the SQL Editor might not process these statements correctly.
```sql BEGIN STATEMENT SET; INSERT INTO paimon.order_dw.orders VALUES (100001, 'user_001', 12345, 1, 5000, TO_TIMESTAMP('2023-02-15 16:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100002, 'user_002', 12346, 2, 4000, TO_TIMESTAMP('2023-02-15 15:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100003, 'user_003', 12347, 3, 3000, TO_TIMESTAMP('2023-02-15 14:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100004, 'user_001', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 13:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100005, 'user_002', 12348, 5, 1000, TO_TIMESTAMP('2023-02-15 12:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100006, 'user_001', 12348, 1, 1000, TO_TIMESTAMP('2023-02-15 11:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100007, 'user_003', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 10:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1);
INSERT INTO paimon.order_dw.orders_pay VALUES (2001, 100001, 1, TO_TIMESTAMP('2023-02-15 17:40:56')), (2002, 100002, 1, TO_TIMESTAMP('2023-02-15 17:40:56')), (2003, 100003, 0, TO_TIMESTAMP('2023-02-15 17:40:56')), (2004, 100004, 0, TO_TIMESTAMP('2023-02-15 17:40:56')), (2005, 100005, 0, TO_TIMESTAMP('2023-02-15 18:40:56')), (2006, 100006, 0, TO_TIMESTAMP('2023-02-15 18:40:56')), (2007, 100007, 0, TO_TIMESTAMP('2023-02-15 18:40:56'));
INSERT INTO paimon.order_dw.product_catalog VALUES (1, 'phone_aaa'), (2, 'phone_bbb'), (3, 'phone_ccc'), (4, 'phone_ddd'), (5, 'phone_eee'); END;
1### 3. Create Batch Deployments
2Create three separate **Batch** drafts for the processing logic. Save/deploy them as drafts so they are available for the workflow.
3
4**Job A: `dwd_orders` (ODS → DWD)**
5
6INSERT OVERWRITE paimon.order_dw.dwd_orders
7SELECT
8 o.order_id, o.user_id, o.shop_id, o.product_id, c.catalog_name, o.buy_fee,
9 o.create_time, o.update_time, o.state, p.pay_id, p.pay_platform, p.create_time
10FROM
11 paimon.order_dw.orders as o,
12 paimon.order_dw.product_catalog as c,
13 paimon.order_dw.orders_pay as p
14WHERE o.product_id = c.product_id AND o.order_id = p.order_id;Job B: dws_shops (DWD → DWS)
1INSERT OVERWRITE paimon.order_dw.dws_shops
2SELECT
3 order_shop_id,
4 DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds,
5 SUM(order_fee) as total_fee
6FROM paimon.order_dw.dwd_orders
7WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL
8GROUP BY order_shop_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');Job C: dws_users (DWD → DWS)
1INSERT OVERWRITE paimon.order_dw.dws_users
2SELECT
3 order_user_id,
4 DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds,
5 SUM(order_fee) as total_fee
6FROM paimon.order_dw.dwd_orders
7WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL
8GROUP BY order_user_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');4. Orchestrate the Workflow
- Go to Workflows and click Create Workflow.
- Complete the Create Workflow dialog:
- Name: Enter a unique name for the workflow.
- Description: (Optional) Provide a description of the workflow.
- Scheduling type: Select one of the following:
- Manual Scheduling: Manually run the workflow by clicking Run. Use this for temporary tests or immediate processing.
- Periodic Scheduling: The workflow is triggered based on scheduler rules. You can run it at regular intervals, such as by the minute, hour, or day.
You must select Periodic Scheduling for workflows that create materialized table nodes.
* Failure Retry Times: Specify the number of times to retry a failed node. By default, failed nodes are not retried. * Failure Notification Email: Enter the default email address for notifications if a workflow node fails. * Resource Queue: Select the deployment target for the workflow. This setting applies to all nodes by default.
This configuration does not change the deployment target of corresponding deployed batch jobs.
* Tags: (Optional) Set a Label key and a Label value for the workflow. 3. Click Create. 4. Drag your three deployments onto the canvas. 5. Connect the nodes to define dependencies: * Connect dwd_orders to dws_shops. * Connect dwd_orders to dws_users. 6. Click Run to execute the workflow manually.
Managing Workflows
Scheduling
In the workflow Configuration tab, you can enable Periodic Scheduling. You can define standard Cron expressions or select simple intervals (e.g., "Every 10 minutes").
Rollback
If you modify a workflow (e.g., add a node or change a schedule), the platform creates a new version. To revert to a previous state:
- Go to the Versions tab.
- Select a previous version.
- Click Rollback.
Monitoring
The Instance History tab provides a log of all executions. You can click a specific run to see the real-time status of individual nodes.
Limitations & Known Issues
- Multi-Statement Execution: The scheduler does not currently support scripts with multiple independent SQL statements. You must use
BEGIN STATEMENT SETfor multipleINSERTcommands or split logic into separate jobs. - Debug Mode:
- No Data Writes: Running a job in Debug mode does not write results to the sink. You must Deploy the job to persist data.
- Debug Selection: Selecting specific statements for debugging might not work in all cases.
- UI and Error Reporting:
- Error Details: Non-validation errors might not appear in the UI panel. If a workflow fails without a clear error message, check the browser Network Panel for details.
- Diagram Visibility: The Diagrams tab might remain empty until the workflow has been executed at least once.
- Lifecycle Management:
- Queue Selection: If only the default queue is available, you might not be prompted to select a queue during execution.
- Deletion: Deleting a workflow with
PENDINGorWAITINGruns might succeed in the UI but fail to cancel the runs in the background. You should cancel individual runs manually before deletion.