Skip to main content

Data Ingestion

Data Ingestion is a low-code framework for syncing data from heterogeneous sources to destinations using declarative YAML jobs. It supports full and incremental data ingestion with millisecond latency, making it well-suited for real-time data warehousing and streaming lakehouse architectures.

note

Data Ingestion requires VERA 4.1 (Flink 1.20) or later and is compatible with Flink CDC 3.5.

Why YAML for CDC?

Defining data ingestion pipelines in YAML offers several advantages over writing Flink jobs in code:

  • Simplified job management — Declarative, human-readable configuration with no Flink internals required.
  • Reusability and consistency — Use template values and share configurations across environments.
  • CI/CD-friendly — Store in version control, review in pull requests, promote across environments, and roll back when needed.
  • Environment separation — Swap credentials, topics, and URIs per environment without changing logic.
  • Faster onboarding — Teams can define pipelines without deep knowledge of Flink APIs.
  • Tooling compatibility — Standard YAML tooling can validate, lint, and test job configurations.
  • Separation of concerns — Data flow logic is defined separately from runtime and platform configuration.

Quick Start

To create a Data Ingestion job:

  1. Go to Data Ingestion and select New Draft.
  2. Name the draft and select an Engine Version.
  3. Paste your YAML configuration and click OK.

Example: MySQL Source

source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql_host}
port: 3306
username: ${secret_values.mysql_user}
password: ${secret_values.mysql_password}
tables: mydb\.\.*
server-id: 5401-5404

sink:
type: paimon
name: Paimon Sink
warehouse: s3://my-bucket/paimon

pipeline:
name: MySQL to Paimon
parallelism: 2

Example: PostgreSQL Source

source:
type: postgres
hostname: ${secret_values.pg_host}
port: 5432
username: ${secret_values.pg_user}
password: ${secret_values.pg_password}
tables: appdb.public.products
slot.name: flink_cdc_slot
scan.startup.mode: initial
decoding.plugin.name: pgoutput
server-time-zone: UTC
debezium.publication.name: flink_pub
debezium.publication.autocreate.mode: disabled

sink:
type: paimon
name: Paimon Sink
warehouse: s3://my-bucket/paimon

pipeline:
name: PostgreSQL to Paimon
parallelism: 2

Example: MongoDB Source

source:
type: mongodb
hosts: ${secret_values.mongo_host}:27017
database: demo
collection: .*
scan.startup.mode: initial
scan.full-changelog: true
scan.ignore-delete.enabled: false
heartbeat.interval.ms: 1000

sink:
type: paimon
name: Paimon Sink
warehouse: s3://my-bucket/paimon

pipeline:
name: MongoDB to Paimon
parallelism: 2
tip

Use ${secret_values.<name>} expressions to reference secrets stored in your workspace. Do not hardcode credentials in YAML files.

Job Structure

A Data Ingestion YAML job supports the following top-level sections:

SectionRequiredDescription
sourceYesSource connector configuration
sinkYesSink connector configuration
transformNoColumn projection, filtering, and computed column rules
routeNoTable routing rules mapping source tables to sink tables
pipelineNoGlobal job settings (name, parallelism, schema change behavior)

Source and Sink Modules

Configure source and sink connectors using the following structure:

source:
type: <connector>
name: <display name>
# connector-specific parameters

sink:
type: <connector>
name: <display name>
# connector-specific parameters

Supported Connectors

ConnectorSourceSink
MySQL
PostgreSQL CDC
MongoDB
Kafka
Upsert Kafka
Paimon
Fluss
Iceberg
StarRocks
Print

Reusing Catalog Connection Information

If you have already registered a catalog in the Data Management section of your workspace, you can reference it directly in your Data Ingestion job using the using.built-in-catalog parameter. This avoids repeating connection details such as hostname, username, and password.

source:
type: mysql
using.built-in-catalog: my_mysql_catalog
tables: mydb\.\.*

sink:
type: paimon
using.built-in-catalog: my_paimon_catalog
note

Catalog connection reuse is supported for the following connectors: MySQL (source), Kafka (source), Upsert Kafka (sink), and Paimon (sink).

note

Catalog parameters that are incompatible with Data Ingestion YAML jobs are ignored. Refer to the individual connector documentation for details.

Transform

The transform module lets you manipulate columns before data reaches the sink. You can select or drop columns, add computed columns, filter rows, and use metadata columns.

note

After modifying transform rules, the job must be restarted without state.

Parameters

ParameterRequiredDescription
source-tableYesSource table to apply the transform to. Supports regular expressions.
projectionNoColumn selection expression, similar to SQL SELECT. If omitted, all columns pass through unchanged.
filterNoRow filter expression, similar to SQL WHERE. If omitted, all rows pass through.
primary-keysNoOverride the primary key definition (comma-separated column names).
partition-keysNoOverride the partition key definition (comma-separated column names).
table-optionsNoExtra options passed to the sink, in key=value,key2=value2 format.
descriptionNoDescription of the transform rule.
converter-after-transformNoPost-transform converters applied after all transform rules.

Column Projection

Use projection to select, rename, or compute columns.

Select specific columns (unspecified columns are dropped):

transform:
- source-table: mydb.orders
projection: id, amount, status

Include all columns with a wildcard:

transform:
- source-table: mydb.orders
projection: \*

Add a computed column:

transform:
- source-table: mydb.orders
projection: id, amount, amount * 1.2 AS amount_with_tax

Append metadata columns to all columns:

transform:
- source-table: \.*.\.*
projection: \*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS full_table_name
note

If a projection expression does not include the wildcard (\*), the resulting schema is fixed. Schema changes from the source will not propagate to the sink for that transform rule.

Metadata Columns

The following metadata columns are available in projection and filter expressions for all connectors:

ColumnTypeDescription
__namespace_name__StringNamespace of the source table
__schema_name__StringSchema of the source table
__table_name__StringTable name of the source table
__data_event_type__StringChange operation type: +I (insert), -U (before update), +U (after update), -D (delete)

The following table shows the mapping of namespace, schema, and table for each connector:

ConnectorNamespaceSchemaTable
MySQLDatabaseTable
PostgreSQLDatabaseSchemaTable
MongoDBDatabaseCollection
KafkaTopic
PaimonDatabaseTable

Row Filtering

Use filter to drop rows that do not match a condition. The expression follows SQL WHERE syntax and evaluates to a boolean.

transform:
- source-table: mydb.orders
projection: id, amount, status
filter: amount > 100 AND status <> 'CANCELLED'

You can reference computed columns defined in the same projection:

transform:
- source-table: mydb.orders
projection: id, amount, amount * 1.2 AS amount_with_tax
filter: amount_with_tax > 150

Post-Transform Converters

The converter-after-transform parameter applies additional processing after all transform rules. Multiple converters can be chained with a comma.

ConverterDescription
SOFT_DELETEConverts delete events to inserts. Use with __data_event_type__ to track deletions without physically removing rows.
FIELD_NAME_LOWER_CASEConverts all field names to lowercase before writing to the sink.

Example — soft delete:

transform:
- source-table: mydb.orders
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE

Route

The route module defines how source tables map to sink tables. Each rule specifies a source table pattern and a destination table. Multiple rules can be defined in a single job.

note

After modifying route rules, the job must be restarted without state.

Parameters

ParameterRequiredDescription
source-tableYesSource table to match. Supports regular expressions.
sink-tableYesDestination table for matched data.
replace-symbolNoA placeholder string in sink-table that is replaced with the matched source table name.
descriptionNoDescription of the route rule.

Routing Patterns

Single table to single table:

route:
- source-table: mydb.orders
sink-table: ods_db.ods_orders
description: route orders to ODS

Single table to multiple tables (fan-out):

route:
- source-table: mydb.orders
sink-table: ods_db.orders
- source-table: mydb.orders
sink-table: backup_db.orders

Multiple tables to multiple tables:

route:
- source-table: mydb.orders
sink-table: ods_db.ods_orders
- source-table: mydb.shipments
sink-table: ods_db.ods_shipments
- source-table: mydb.products
sink-table: ods_db.ods_products

Merge sharded tables into one:

route:
- source-table: mydb\.\.*
sink-table: ods_db.merged
description: merge all sharded tables

Full database sync (preserve table names):

route:
- source-table: mydb\.\.*
sink-table: ods_db.<>
replace-symbol: <>
description: sync all tables preserving names

Regex Capture Groups

For advanced routing, use capture groups in the source-table regex and reference them with $1, $2, and so on in sink-table.

Add a prefix to all databases, preserve table names:

route:
- source-table: (\.*).(\.*)
sink-table: ods_$1.$2

Merge sharded tables using capture groups:

route:
- source-table: db_(\.*).orders_(\d+)
sink-table: ods_db.orders_$1_$2

Merge sharded tables and route per-shard customers with capture groups:

route:
- source-table: public.orders_\.*
sink-table: ods.orders_all
description: merge sharded orders tables
- source-table: public.customers_(\d+)
sink-table: ods.customer_$1
description: per-shard customer routing

Pipeline

The pipeline section defines global job configuration that applies to the entire Data Ingestion job.

Parameters

ParameterRequiredDefaultDescription
nameNoFlink CDC Pipeline JobDisplay name for the job.
parallelismNoJob-level parallelism.
schema.change.behaviorNoLENIENTHow schema changes from the source are applied to the sink. See Schema Change Synchronization.
schema-operator.rpc-timeoutNo3minTimeout for schema change coordination.
local-time-zoneNoSystem timezoneTimezone used in Transform module expressions.
user-defined-functionNoRegister custom UDFs. See Custom Functions.
dirty-data.collectorNoConfigure dirty data logging. See Dirty Data Collection.

Schema Change Synchronization

Data Ingestion jobs can propagate schema changes from the source to the sink, including table creation, column additions, column type changes, column drops, column renames, table truncations, and table drops.

Configure how schema changes are handled using schema.change.behavior in the pipeline section:

pipeline:
schema.change.behavior: EVOLVE

Behavior Modes

ModeDescription
LENIENT (default)Applies schema changes with tolerant conversions. Drop table and truncate table events are suppressed. Column renames become a type change plus a new column addition. Column drops become a type change to nullable. New columns are added as nullable. Recommended for most use cases.
EVOLVEApplies all schema changes strictly. If any change fails in the sink, the job throws an exception and triggers a restart. Use when strict schema parity is required.
TRY_EVOLVEAttempts to apply schema changes. If a change fails in the sink, the job continues without failing but may produce rows with missing or truncated columns. Use when strict parity is preferred but full fault tolerance is needed.
IGNOREIgnores all schema changes. The sink schema remains fixed regardless of source schema changes. Use when the sink does not support schema changes.
EXCEPTIONThrows an exception when any schema change event is received. Use when schema changes must never occur.

Controlling Which Changes Reach the Sink

You can fine-tune which schema change events are applied at the sink level using include.schema.changes and exclude.schema.changes:

ParameterDefaultDescription
include.schema.changesAllList of schema change types to allow.
exclude.schema.changesNoneList of schema change types to block. Takes precedence over include.schema.changes.

Supported event types:

Event typeDescription
create.tableTable creation
add.columnColumn addition
alter.column.typeColumn type change
rename.columnColumn rename
drop.columnColumn deletion
truncate.tableTable truncation
drop.tableTable deletion

Partial matching is supported: drop matches both drop.column and drop.table; column matches all column-level events.

Example — allow table creation and column events, exclude drop column:

sink:
type: paimon
name: Paimon Sink
warehouse: s3://my-bucket/paimon
include.schema.changes: [create.table, column]
exclude.schema.changes: [drop.column]

pipeline:
schema.change.behavior: EVOLVE

Built-in CDC Functions

The following functions are available in projection and filter expressions in the transform module.

Arithmetic Functions

FunctionDescription
numeric1 + numeric2Addition
numeric1 - numeric2Subtraction
numeric1 * numeric2Multiplication
numeric1 / numeric2Division
numeric1 % numeric2Modulo
ABS(numeric)Absolute value
CEIL(numeric)Round up to nearest integer
FLOOR(numeric)Round down to nearest integer
ROUND(numeric, int)Round to int decimal places
UUID()Generate a random UUID string (RFC 4122 type 4)

Time Functions

FunctionDescription
LOCALTIMECurrent local time (TIME(0))
LOCALTIMESTAMPCurrent local timestamp (TIMESTAMP(3))
CURRENT_TIMEAlias for LOCALTIME
CURRENT_DATECurrent local date
CURRENT_TIMESTAMPCurrent local timestamp (TIMESTAMP_LTZ(3))
NOW()Alias for CURRENT_TIMESTAMP
DATE_FORMAT(timestamp, format)Format a timestamp as a string using Java SimpleDateFormat patterns
DATE_FORMAT_TZ(timestamp, format)Format a timestamp to a timezone-aware string (yyyy-MM-dd HH:mm:ss)
DATE_FORMAT_TZ(timestamp, format, timezone)Format a timestamp using the specified format and timezone
TIMESTAMPADD(unit, interval, timepoint)Add an interval to a timepoint. Units: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR
DATE_ADD(timestamp, days, timezone)Add days to a timestamp and return a yyyy-MM-dd string in the given timezone
TIMESTAMPDIFF(unit, timepoint1, timepoint2)Difference between two timepoints in the given unit
TO_DATE(string[, format])Parse a date string to DATE (default format: yyyy-MM-dd)
TO_TIMESTAMP(string[, format])Parse a string to TIMESTAMP (default: yyyy-MM-dd HH:mm:ss)
TO_TIMESTAMP_LTZ(string[, format][, timezone])Parse a string to TIMESTAMP_LTZ (default format: yyyy-MM-dd HH:mm:ss.SSS, default timezone: UTC)
TO_TIMESTAMP_LTZ(bigint[, precision])Convert a Unix epoch value to TIMESTAMP_LTZ. Precision: 0 (seconds) or 3 (milliseconds)
FROM_UNIXTIME(numeric[, format])Convert seconds since Unix epoch to a formatted timestamp string
UNIX_TIMESTAMP()Current Unix timestamp in seconds
UNIX_TIMESTAMP(string[, format])Parse a datetime string and return its Unix timestamp
note

Within a single projection or filter evaluation, all calls to time functions such as NOW() return the same timestamp, regardless of evaluation order.

String Functions

FunctionDescription
string1 || string2Concatenate two strings
CHAR_LENGTH(string)Number of characters in a string
UPPER(string)Convert to uppercase
LOWER(string)Convert to lowercase
TRIM(string)Remove leading and trailing whitespace
REGEXP_REPLACE(string, pattern, replacement)Replace all matches of pattern in string with replacement
SUBSTRING(string FROM start [FOR length])Extract a substring starting at start, optionally limited to length characters
CONCAT(string1, string2, ...)Concatenate multiple strings

Type Cast

Use CAST(<expression> AS <type>) for explicit type conversion.

Source typeTarget type
AnyVARCHAR
NUMERIC, STRINGBOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DECIMAL
TIMESTAMP_TZ, TIMESTAMP_LTZ, STRINGTIMESTAMP
note

Use VARCHAR as the target type for string conversions. STRING is not supported as a cast target.

Comparison Functions

FunctionDescription
value1 = value2Equal to
value1 <> value2Not equal to
value1 > value2Greater than
value1 >= value2Greater than or equal to
value1 < value2Less than
value1 <= value2Less than or equal to
value IS NULLTrue if value is null
value IS NOT NULLTrue if value is not null
value1 BETWEEN value2 AND value3True if value1 is within the range
value1 NOT BETWEEN value2 AND value3True if value1 is outside the range
string1 LIKE string2True if string1 matches the pattern
value1 IN (value2, ...)True if value1 is in the list
value1 NOT IN (value2, ...)True if value1 is not in the list

Logical Functions

FunctionDescription
boolean1 OR boolean2True if either is true
boolean1 AND boolean2True if both are true
NOT booleanLogical negation
boolean IS TRUE / IS NOT TRUENull-safe true check
boolean IS FALSE / IS NOT FALSENull-safe false check

Conditional Functions

FunctionDescription
CASE value WHEN v1 THEN r1 ... ELSE rz ENDReturns the result for the first matching value
CASE WHEN condition1 THEN r1 ... ELSE rz ENDReturns the result for the first true condition
COALESCE(value1, value2, ...)Returns the first non-null value
IF(condition, true_value, false_value)Returns true_value if condition is true, otherwise false_value

Hash Functions

FunctionDescription
MD5(string)MD5 hash as a 32-character hex string
SHA1(string)SHA-1 hash as a 40-character hex string
SHA224(string)SHA-224 hash as a 56-character hex string
SHA256(string)SHA-256 hash as a 64-character hex string
SHA384(string)SHA-384 hash as a 96-character hex string
SHA512(string)SHA-512 hash as a 128-character hex string
SHA2(string, hashLength)SHA-2 hash with the specified bit length (224, 256, 384, or 512)

Variant and JSON Functions

FunctionDescription
PARSE_JSON(json_string[, allow_duplicate_keys])Parse a JSON string to a Variant value. Throws an error if the input is invalid.
TRY_PARSE_JSON(json_string[, allow_duplicate_keys])Parse a JSON string to a Variant value. Returns NULL if the input is invalid.

Nested Type Access

For ARRAY, MAP, and VARIANT fields, use the [] operator to access elements:

  • Array: nest_col[1] — returns the element at index 1 (1-based)
  • Map: nest_col['key'] — returns the value for the given key
  • Chained: nest_col[1]['id'] — access nested elements

Custom Functions

If the built-in functions do not meet your needs, you can write custom User-Defined Functions (UDFs) in Java and use them in projection and filter expressions.

Writing a UDF

A valid UDF class must:

  • Implement org.apache.flink.cdc.common.udf.UserDefinedFunction
  • Have a public no-argument constructor
  • Contain at least one public eval method

You can optionally override the following methods:

  • getReturnType() — specify the return type explicitly when it cannot be inferred
  • open() / close() — lifecycle hooks for initialization and teardown

Maven dependency:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>3.5.0</version>
<scope>provided</scope>
</dependency>

Example UDF:

public class AddOneFunctionClass implements UserDefinedFunction {

public Object eval(Integer num) {
return num + 1;
}

@Override
public DataType getReturnType() {
return DataTypes.INT();
}

@Override
public void open() throws Exception { }

@Override
public void close() throws Exception { }
}

Type Mapping

CDC Column TypeJava Type
BOOLEANjava.lang.Boolean
TINYINTjava.lang.Byte
SMALLINTjava.lang.Short
INTEGERjava.lang.Integer
BIGINTjava.lang.Long
FLOATjava.lang.Float
DOUBLEjava.lang.Double
DECIMALjava.math.BigDecimal
DATEjava.time.LocalDate
TIMEjava.time.LocalTime
TIMESTAMPjava.time.LocalDateTime
TIMESTAMP_TZjava.time.ZonedDateTime
TIMESTAMP_LTZjava.time.Instant
CHAR, VARCHAR, STRINGjava.lang.String
BINARY, VARBINARY, BYTESbyte[]
ARRAYjava.util.List
MAPjava.util.Map
ROWjava.util.List
VARIANTorg.apache.flink.cdc.common.types.variant.Variant

Registering a UDF

Register UDFs in the pipeline section of your YAML job:

pipeline:
user-defined-function:
- name: add_one
classpath: com.example.udf.AddOneFunctionClass
- name: format_id
classpath: com.example.udf.FormatFunctionClass

The JAR file containing the UDF class must be uploaded as an additional dependency in the job's Additional Configuration settings.

The name field is what you use in expressions — it does not need to match the class name.

Using a UDF

After registration, call UDFs in projection and filter expressions the same way you call built-in functions:

transform:
- source-table: mydb\.\.*
projection: "*, add_one(add_one(id)) AS id_plus_2, format_id(id, 'id -> %d') AS id_label"
filter: add_one(id) < 100

Classes that extend Flink's ScalarFunction can also be registered and used as CDC UDFs, with the following limitations:

  • Parameterized ScalarFunction constructors are not supported.
  • Flink TypeInformation type annotations are ignored.
  • open() and close() lifecycle hooks are not called.

Dirty Data Collection

When ingesting data from sources that may produce malformed records (for example, Kafka topics with invalid JSON), you can configure the job to tolerate and log dirty data instead of failing.

Source-Level Configuration

Add the following parameters to the source section:

ParameterDefaultDescription
ingestion.ignore-errorsfalseWhen true, malformed records are skipped instead of causing the job to fail.
ingestion.error-tolerance.max-countMaximum number of dirty records before the job fails. Set to -1 for unlimited tolerance.

Pipeline-Level Configuration

Configure a dirty data collector in the pipeline section to capture details about skipped records:

ParameterDescription
dirty-data.collector.nameName of the collector
dirty-data.collector.typeCollector type. Currently, only logger is supported.
note

Only the Kafka source connector currently supports dirty data collection. Support for additional source connectors is planned.

Example

source:
type: kafka
properties.bootstrap.servers: ${secret_values.kafka_brokers}
topic: my-topic
value.format: json
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100

sink:
type: paimon
warehouse: s3://my-bucket/paimon

pipeline:
name: Kafka to Paimon with Error Tolerance
parallelism: 2
dirty-data.collector:
name: dirty-data-logger
type: logger

Log Output

When a dirty record is encountered, the job writes a structured entry to yaml-dirty-data.out in the Task Manager log directory:

[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---

Each entry includes the timestamp and subtask that produced the record, the raw data that could not be parsed, and the exception message.

tip

To locate dirty data logs, go to Deployments → Diagnostic → Task Managers → Logs and open yaml-dirty-data.out.