Data Ingestion
Data Ingestion is a low-code framework for syncing data from heterogeneous sources to destinations using declarative YAML jobs. It supports full and incremental data ingestion with millisecond latency, making it well-suited for real-time data warehousing and streaming lakehouse architectures.
Data Ingestion requires VERA 4.1 (Flink 1.20) or later and is compatible with Flink CDC 3.5.
Why YAML for CDC?
Defining data ingestion pipelines in YAML offers several advantages over writing Flink jobs in code:
- Simplified job management — Declarative, human-readable configuration with no Flink internals required.
- Reusability and consistency — Use template values and share configurations across environments.
- CI/CD-friendly — Store in version control, review in pull requests, promote across environments, and roll back when needed.
- Environment separation — Swap credentials, topics, and URIs per environment without changing logic.
- Faster onboarding — Teams can define pipelines without deep knowledge of Flink APIs.
- Tooling compatibility — Standard YAML tooling can validate, lint, and test job configurations.
- Separation of concerns — Data flow logic is defined separately from runtime and platform configuration.
Quick Start
To create a Data Ingestion job:
- Go to Data Ingestion and select New Draft.
- Name the draft and select an Engine Version.
- Paste your YAML configuration and click OK.
Example: MySQL Source
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql_host}
port: 3306
username: ${secret_values.mysql_user}
password: ${secret_values.mysql_password}
tables: mydb\.\.*
server-id: 5401-5404
sink:
type: paimon
name: Paimon Sink
warehouse: s3://my-bucket/paimon
pipeline:
name: MySQL to Paimon
parallelism: 2
Example: PostgreSQL Source
source:
type: postgres
hostname: ${secret_values.pg_host}
port: 5432
username: ${secret_values.pg_user}
password: ${secret_values.pg_password}
tables: appdb.public.products
slot.name: flink_cdc_slot
scan.startup.mode: initial
decoding.plugin.name: pgoutput
server-time-zone: UTC
debezium.publication.name: flink_pub
debezium.publication.autocreate.mode: disabled
sink:
type: paimon
name: Paimon Sink
warehouse: s3://my-bucket/paimon
pipeline:
name: PostgreSQL to Paimon
parallelism: 2
Example: MongoDB Source
source:
type: mongodb
hosts: ${secret_values.mongo_host}:27017
database: demo
collection: .*
scan.startup.mode: initial
scan.full-changelog: true
scan.ignore-delete.enabled: false
heartbeat.interval.ms: 1000
sink:
type: paimon
name: Paimon Sink
warehouse: s3://my-bucket/paimon
pipeline:
name: MongoDB to Paimon
parallelism: 2
Use ${secret_values.<name>} expressions to reference secrets stored in your workspace. Do not hardcode credentials in YAML files.
Job Structure
A Data Ingestion YAML job supports the following top-level sections:
| Section | Required | Description |
|---|---|---|
source | Yes | Source connector configuration |
sink | Yes | Sink connector configuration |
transform | No | Column projection, filtering, and computed column rules |
route | No | Table routing rules mapping source tables to sink tables |
pipeline | No | Global job settings (name, parallelism, schema change behavior) |
Source and Sink Modules
Configure source and sink connectors using the following structure:
source:
type: <connector>
name: <display name>
# connector-specific parameters
sink:
type: <connector>
name: <display name>
# connector-specific parameters
Supported Connectors
| Connector | Source | Sink |
|---|---|---|
| MySQL | ✓ | |
| PostgreSQL CDC | ✓ | |
| MongoDB | ✓ | |
| Kafka | ✓ | ✓ |
| Upsert Kafka | ✓ | |
| Paimon | ✓ | |
| Fluss | ✓ | |
| Iceberg | ✓ | |
| StarRocks | ✓ | |
| ✓ |
Reusing Catalog Connection Information
If you have already registered a catalog in the Data Management section of your workspace, you can reference it directly in your Data Ingestion job using the using.built-in-catalog parameter. This avoids repeating connection details such as hostname, username, and password.
source:
type: mysql
using.built-in-catalog: my_mysql_catalog
tables: mydb\.\.*
sink:
type: paimon
using.built-in-catalog: my_paimon_catalog
Catalog connection reuse is supported for the following connectors: MySQL (source), Kafka (source), Upsert Kafka (sink), and Paimon (sink).
Catalog parameters that are incompatible with Data Ingestion YAML jobs are ignored. Refer to the individual connector documentation for details.
Transform
The transform module lets you manipulate columns before data reaches the sink. You can select or drop columns, add computed columns, filter rows, and use metadata columns.
After modifying transform rules, the job must be restarted without state.
Parameters
| Parameter | Required | Description |
|---|---|---|
source-table | Yes | Source table to apply the transform to. Supports regular expressions. |
projection | No | Column selection expression, similar to SQL SELECT. If omitted, all columns pass through unchanged. |
filter | No | Row filter expression, similar to SQL WHERE. If omitted, all rows pass through. |
primary-keys | No | Override the primary key definition (comma-separated column names). |
partition-keys | No | Override the partition key definition (comma-separated column names). |
table-options | No | Extra options passed to the sink, in key=value,key2=value2 format. |
description | No | Description of the transform rule. |
converter-after-transform | No | Post-transform converters applied after all transform rules. |
Column Projection
Use projection to select, rename, or compute columns.
Select specific columns (unspecified columns are dropped):
transform:
- source-table: mydb.orders
projection: id, amount, status
Include all columns with a wildcard:
transform:
- source-table: mydb.orders
projection: \*
Add a computed column:
transform:
- source-table: mydb.orders
projection: id, amount, amount * 1.2 AS amount_with_tax
Append metadata columns to all columns:
transform:
- source-table: \.*.\.*
projection: \*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS full_table_name
If a projection expression does not include the wildcard (\*), the resulting schema is fixed. Schema changes from the source will not propagate to the sink for that transform rule.
Metadata Columns
The following metadata columns are available in projection and filter expressions for all connectors:
| Column | Type | Description |
|---|---|---|
__namespace_name__ | String | Namespace of the source table |
__schema_name__ | String | Schema of the source table |
__table_name__ | String | Table name of the source table |
__data_event_type__ | String | Change operation type: +I (insert), -U (before update), +U (after update), -D (delete) |
The following table shows the mapping of namespace, schema, and table for each connector:
| Connector | Namespace | Schema | Table |
|---|---|---|---|
| MySQL | — | Database | Table |
| PostgreSQL | Database | Schema | Table |
| MongoDB | — | Database | Collection |
| Kafka | — | — | Topic |
| Paimon | — | Database | Table |
Row Filtering
Use filter to drop rows that do not match a condition. The expression follows SQL WHERE syntax and evaluates to a boolean.
transform:
- source-table: mydb.orders
projection: id, amount, status
filter: amount > 100 AND status <> 'CANCELLED'
You can reference computed columns defined in the same projection:
transform:
- source-table: mydb.orders
projection: id, amount, amount * 1.2 AS amount_with_tax
filter: amount_with_tax > 150
Post-Transform Converters
The converter-after-transform parameter applies additional processing after all transform rules. Multiple converters can be chained with a comma.
| Converter | Description |
|---|---|
SOFT_DELETE | Converts delete events to inserts. Use with __data_event_type__ to track deletions without physically removing rows. |
FIELD_NAME_LOWER_CASE | Converts all field names to lowercase before writing to the sink. |
Example — soft delete:
transform:
- source-table: mydb.orders
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
Route
The route module defines how source tables map to sink tables. Each rule specifies a source table pattern and a destination table. Multiple rules can be defined in a single job.
After modifying route rules, the job must be restarted without state.
Parameters
| Parameter | Required | Description |
|---|---|---|
source-table | Yes | Source table to match. Supports regular expressions. |
sink-table | Yes | Destination table for matched data. |
replace-symbol | No | A placeholder string in sink-table that is replaced with the matched source table name. |
description | No | Description of the route rule. |
Routing Patterns
Single table to single table:
route:
- source-table: mydb.orders
sink-table: ods_db.ods_orders
description: route orders to ODS
Single table to multiple tables (fan-out):
route:
- source-table: mydb.orders
sink-table: ods_db.orders
- source-table: mydb.orders
sink-table: backup_db.orders
Multiple tables to multiple tables:
route:
- source-table: mydb.orders
sink-table: ods_db.ods_orders
- source-table: mydb.shipments
sink-table: ods_db.ods_shipments
- source-table: mydb.products
sink-table: ods_db.ods_products
Merge sharded tables into one:
route:
- source-table: mydb\.\.*
sink-table: ods_db.merged
description: merge all sharded tables
Full database sync (preserve table names):
route:
- source-table: mydb\.\.*
sink-table: ods_db.<>
replace-symbol: <>
description: sync all tables preserving names
Regex Capture Groups
For advanced routing, use capture groups in the source-table regex and reference them with $1, $2, and so on in sink-table.
Add a prefix to all databases, preserve table names:
route:
- source-table: (\.*).(\.*)
sink-table: ods_$1.$2
Merge sharded tables using capture groups:
route:
- source-table: db_(\.*).orders_(\d+)
sink-table: ods_db.orders_$1_$2
Merge sharded tables and route per-shard customers with capture groups:
route:
- source-table: public.orders_\.*
sink-table: ods.orders_all
description: merge sharded orders tables
- source-table: public.customers_(\d+)
sink-table: ods.customer_$1
description: per-shard customer routing
Pipeline
The pipeline section defines global job configuration that applies to the entire Data Ingestion job.
Parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
name | No | Flink CDC Pipeline Job | Display name for the job. |
parallelism | No | — | Job-level parallelism. |
schema.change.behavior | No | LENIENT | How schema changes from the source are applied to the sink. See Schema Change Synchronization. |
schema-operator.rpc-timeout | No | 3min | Timeout for schema change coordination. |
local-time-zone | No | System timezone | Timezone used in Transform module expressions. |
user-defined-function | No | — | Register custom UDFs. See Custom Functions. |
dirty-data.collector | No | — | Configure dirty data logging. See Dirty Data Collection. |
Schema Change Synchronization
Data Ingestion jobs can propagate schema changes from the source to the sink, including table creation, column additions, column type changes, column drops, column renames, table truncations, and table drops.
Configure how schema changes are handled using schema.change.behavior in the pipeline section:
pipeline:
schema.change.behavior: EVOLVE
Behavior Modes
| Mode | Description |
|---|---|
LENIENT (default) | Applies schema changes with tolerant conversions. Drop table and truncate table events are suppressed. Column renames become a type change plus a new column addition. Column drops become a type change to nullable. New columns are added as nullable. Recommended for most use cases. |
EVOLVE | Applies all schema changes strictly. If any change fails in the sink, the job throws an exception and triggers a restart. Use when strict schema parity is required. |
TRY_EVOLVE | Attempts to apply schema changes. If a change fails in the sink, the job continues without failing but may produce rows with missing or truncated columns. Use when strict parity is preferred but full fault tolerance is needed. |
IGNORE | Ignores all schema changes. The sink schema remains fixed regardless of source schema changes. Use when the sink does not support schema changes. |
EXCEPTION | Throws an exception when any schema change event is received. Use when schema changes must never occur. |
Controlling Which Changes Reach the Sink
You can fine-tune which schema change events are applied at the sink level using include.schema.changes and exclude.schema.changes:
| Parameter | Default | Description |
|---|---|---|
include.schema.changes | All | List of schema change types to allow. |
exclude.schema.changes | None | List of schema change types to block. Takes precedence over include.schema.changes. |
Supported event types:
| Event type | Description |
|---|---|
create.table | Table creation |
add.column | Column addition |
alter.column.type | Column type change |
rename.column | Column rename |
drop.column | Column deletion |
truncate.table | Table truncation |
drop.table | Table deletion |
Partial matching is supported: drop matches both drop.column and drop.table; column matches all column-level events.
Example — allow table creation and column events, exclude drop column:
sink:
type: paimon
name: Paimon Sink
warehouse: s3://my-bucket/paimon
include.schema.changes: [create.table, column]
exclude.schema.changes: [drop.column]
pipeline:
schema.change.behavior: EVOLVE
Built-in CDC Functions
The following functions are available in projection and filter expressions in the transform module.
Arithmetic Functions
| Function | Description |
|---|---|
numeric1 + numeric2 | Addition |
numeric1 - numeric2 | Subtraction |
numeric1 * numeric2 | Multiplication |
numeric1 / numeric2 | Division |
numeric1 % numeric2 | Modulo |
ABS(numeric) | Absolute value |
CEIL(numeric) | Round up to nearest integer |
FLOOR(numeric) | Round down to nearest integer |
ROUND(numeric, int) | Round to int decimal places |
UUID() | Generate a random UUID string (RFC 4122 type 4) |
Time Functions
| Function | Description |
|---|---|
LOCALTIME | Current local time (TIME(0)) |
LOCALTIMESTAMP | Current local timestamp (TIMESTAMP(3)) |
CURRENT_TIME | Alias for LOCALTIME |
CURRENT_DATE | Current local date |
CURRENT_TIMESTAMP | Current local timestamp (TIMESTAMP_LTZ(3)) |
NOW() | Alias for CURRENT_TIMESTAMP |
DATE_FORMAT(timestamp, format) | Format a timestamp as a string using Java SimpleDateFormat patterns |
DATE_FORMAT_TZ(timestamp, format) | Format a timestamp to a timezone-aware string (yyyy-MM-dd HH:mm:ss) |
DATE_FORMAT_TZ(timestamp, format, timezone) | Format a timestamp using the specified format and timezone |
TIMESTAMPADD(unit, interval, timepoint) | Add an interval to a timepoint. Units: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR |
DATE_ADD(timestamp, days, timezone) | Add days to a timestamp and return a yyyy-MM-dd string in the given timezone |
TIMESTAMPDIFF(unit, timepoint1, timepoint2) | Difference between two timepoints in the given unit |
TO_DATE(string[, format]) | Parse a date string to DATE (default format: yyyy-MM-dd) |
TO_TIMESTAMP(string[, format]) | Parse a string to TIMESTAMP (default: yyyy-MM-dd HH:mm:ss) |
TO_TIMESTAMP_LTZ(string[, format][, timezone]) | Parse a string to TIMESTAMP_LTZ (default format: yyyy-MM-dd HH:mm:ss.SSS, default timezone: UTC) |
TO_TIMESTAMP_LTZ(bigint[, precision]) | Convert a Unix epoch value to TIMESTAMP_LTZ. Precision: 0 (seconds) or 3 (milliseconds) |
FROM_UNIXTIME(numeric[, format]) | Convert seconds since Unix epoch to a formatted timestamp string |
UNIX_TIMESTAMP() | Current Unix timestamp in seconds |
UNIX_TIMESTAMP(string[, format]) | Parse a datetime string and return its Unix timestamp |
Within a single projection or filter evaluation, all calls to time functions such as NOW() return the same timestamp, regardless of evaluation order.
String Functions
| Function | Description |
|---|---|
string1 || string2 | Concatenate two strings |
CHAR_LENGTH(string) | Number of characters in a string |
UPPER(string) | Convert to uppercase |
LOWER(string) | Convert to lowercase |
TRIM(string) | Remove leading and trailing whitespace |
REGEXP_REPLACE(string, pattern, replacement) | Replace all matches of pattern in string with replacement |
SUBSTRING(string FROM start [FOR length]) | Extract a substring starting at start, optionally limited to length characters |
CONCAT(string1, string2, ...) | Concatenate multiple strings |
Type Cast
Use CAST(<expression> AS <type>) for explicit type conversion.
| Source type | Target type |
|---|---|
| Any | VARCHAR |
NUMERIC, STRING | BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DECIMAL |
TIMESTAMP_TZ, TIMESTAMP_LTZ, STRING | TIMESTAMP |
Use VARCHAR as the target type for string conversions. STRING is not supported as a cast target.
Comparison Functions
| Function | Description |
|---|---|
value1 = value2 | Equal to |
value1 <> value2 | Not equal to |
value1 > value2 | Greater than |
value1 >= value2 | Greater than or equal to |
value1 < value2 | Less than |
value1 <= value2 | Less than or equal to |
value IS NULL | True if value is null |
value IS NOT NULL | True if value is not null |
value1 BETWEEN value2 AND value3 | True if value1 is within the range |
value1 NOT BETWEEN value2 AND value3 | True if value1 is outside the range |
string1 LIKE string2 | True if string1 matches the pattern |
value1 IN (value2, ...) | True if value1 is in the list |
value1 NOT IN (value2, ...) | True if value1 is not in the list |
Logical Functions
| Function | Description |
|---|---|
boolean1 OR boolean2 | True if either is true |
boolean1 AND boolean2 | True if both are true |
NOT boolean | Logical negation |
boolean IS TRUE / IS NOT TRUE | Null-safe true check |
boolean IS FALSE / IS NOT FALSE | Null-safe false check |
Conditional Functions
| Function | Description |
|---|---|
CASE value WHEN v1 THEN r1 ... ELSE rz END | Returns the result for the first matching value |
CASE WHEN condition1 THEN r1 ... ELSE rz END | Returns the result for the first true condition |
COALESCE(value1, value2, ...) | Returns the first non-null value |
IF(condition, true_value, false_value) | Returns true_value if condition is true, otherwise false_value |
Hash Functions
| Function | Description |
|---|---|
MD5(string) | MD5 hash as a 32-character hex string |
SHA1(string) | SHA-1 hash as a 40-character hex string |
SHA224(string) | SHA-224 hash as a 56-character hex string |
SHA256(string) | SHA-256 hash as a 64-character hex string |
SHA384(string) | SHA-384 hash as a 96-character hex string |
SHA512(string) | SHA-512 hash as a 128-character hex string |
SHA2(string, hashLength) | SHA-2 hash with the specified bit length (224, 256, 384, or 512) |
Variant and JSON Functions
| Function | Description |
|---|---|
PARSE_JSON(json_string[, allow_duplicate_keys]) | Parse a JSON string to a Variant value. Throws an error if the input is invalid. |
TRY_PARSE_JSON(json_string[, allow_duplicate_keys]) | Parse a JSON string to a Variant value. Returns NULL if the input is invalid. |
Nested Type Access
For ARRAY, MAP, and VARIANT fields, use the [] operator to access elements:
- Array:
nest_col[1]— returns the element at index 1 (1-based) - Map:
nest_col['key']— returns the value for the given key - Chained:
nest_col[1]['id']— access nested elements
Custom Functions
If the built-in functions do not meet your needs, you can write custom User-Defined Functions (UDFs) in Java and use them in projection and filter expressions.
Writing a UDF
A valid UDF class must:
- Implement
org.apache.flink.cdc.common.udf.UserDefinedFunction - Have a public no-argument constructor
- Contain at least one public
evalmethod
You can optionally override the following methods:
getReturnType()— specify the return type explicitly when it cannot be inferredopen()/close()— lifecycle hooks for initialization and teardown
Maven dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>3.5.0</version>
<scope>provided</scope>
</dependency>
Example UDF:
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
return DataTypes.INT();
}
@Override
public void open() throws Exception { }
@Override
public void close() throws Exception { }
}
Type Mapping
| CDC Column Type | Java Type |
|---|---|
BOOLEAN | java.lang.Boolean |
TINYINT | java.lang.Byte |
SMALLINT | java.lang.Short |
INTEGER | java.lang.Integer |
BIGINT | java.lang.Long |
FLOAT | java.lang.Float |
DOUBLE | java.lang.Double |
DECIMAL | java.math.BigDecimal |
DATE | java.time.LocalDate |
TIME | java.time.LocalTime |
TIMESTAMP | java.time.LocalDateTime |
TIMESTAMP_TZ | java.time.ZonedDateTime |
TIMESTAMP_LTZ | java.time.Instant |
CHAR, VARCHAR, STRING | java.lang.String |
BINARY, VARBINARY, BYTES | byte[] |
ARRAY | java.util.List |
MAP | java.util.Map |
ROW | java.util.List |
VARIANT | org.apache.flink.cdc.common.types.variant.Variant |
Registering a UDF
Register UDFs in the pipeline section of your YAML job:
pipeline:
user-defined-function:
- name: add_one
classpath: com.example.udf.AddOneFunctionClass
- name: format_id
classpath: com.example.udf.FormatFunctionClass
The JAR file containing the UDF class must be uploaded as an additional dependency in the job's Additional Configuration settings.
The name field is what you use in expressions — it does not need to match the class name.
Using a UDF
After registration, call UDFs in projection and filter expressions the same way you call built-in functions:
transform:
- source-table: mydb\.\.*
projection: "*, add_one(add_one(id)) AS id_plus_2, format_id(id, 'id -> %d') AS id_label"
filter: add_one(id) < 100
Flink SQL UDF Compatibility
Classes that extend Flink's ScalarFunction can also be registered and used as CDC UDFs, with the following limitations:
- Parameterized
ScalarFunctionconstructors are not supported. - Flink
TypeInformationtype annotations are ignored. open()andclose()lifecycle hooks are not called.
Dirty Data Collection
When ingesting data from sources that may produce malformed records (for example, Kafka topics with invalid JSON), you can configure the job to tolerate and log dirty data instead of failing.
Source-Level Configuration
Add the following parameters to the source section:
| Parameter | Default | Description |
|---|---|---|
ingestion.ignore-errors | false | When true, malformed records are skipped instead of causing the job to fail. |
ingestion.error-tolerance.max-count | — | Maximum number of dirty records before the job fails. Set to -1 for unlimited tolerance. |
Pipeline-Level Configuration
Configure a dirty data collector in the pipeline section to capture details about skipped records:
| Parameter | Description |
|---|---|
dirty-data.collector.name | Name of the collector |
dirty-data.collector.type | Collector type. Currently, only logger is supported. |
Only the Kafka source connector currently supports dirty data collection. Support for additional source connectors is planned.
Example
source:
type: kafka
properties.bootstrap.servers: ${secret_values.kafka_brokers}
topic: my-topic
value.format: json
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100
sink:
type: paimon
warehouse: s3://my-bucket/paimon
pipeline:
name: Kafka to Paimon with Error Tolerance
parallelism: 2
dirty-data.collector:
name: dirty-data-logger
type: logger
Log Output
When a dirty record is encountered, the job writes a structured entry to yaml-dirty-data.out in the Task Manager log directory:
[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---
Each entry includes the timestamp and subtask that produced the record, the raw data that could not be parsed, and the exception message.
To locate dirty data logs, go to Deployments → Diagnostic → Task Managers → Logs and open yaml-dirty-data.out.