Canal
This topic introduces you to the usage examples, configuration options, and type mappings of the Canal format.
Background Information
Canal is a CDC (ChangeLog Data Capture) tool that can transmit MySQL changes to other systems in real time. Canal provides a unified data format for changelogs and supports the use of JSON or protobuf serialized messages (Canal uses protobuf by default).
Flink supports parsing Canal's JSON messages into INSERT, UPDATE, or DELETE messages into the Flink SQL system. In many cases, it is very useful to take advantage of this feature, such as:
- Synchronize incremental data from database to other systems
- Log audit
- Real-time materialized view of the database
- Temporal join change history of database tables.
Flink also supports encoding INSERT, UPDATE, or DELETE messages in Flink SQL into JSON messages in Canal format and outputting them to storage such as Kafka.
Currently Flink does not support combining UPDATE_BEFORE and UPDATE_AFTER into one UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER into Canal messages of type DELETE and INSERT respectively.
Example of Use
Canal provides a unified format for the change log. The following is a simple example of capturing update operations from the MySQL library products table:
{
"data": [
{
"id": "111",
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": "5.18"
}
],
"database": "inventory",
"es": 1589373560000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "INTEGER",
"name": "VARCHAR(255)",
"description": "VARCHAR(512)",
"weight": "FLOAT"
},
"old": [
{
"weight": "5.15"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"description": 12,
"weight": 7
},
"table": "products",
"ts": 1589373560798,
"type": "UPDATE"
}
Description: for the meaning of each field, see Canal documentation.
The MySQL products table has 4 columns (id, name, description, and weight). The above JSON message is an update event on the products table, indicating that the value of the weight field on the row data with id = 111 has changed from 5.15 to 5.18. Assuming that the messages have been synchronized to a Kafka topic named products_binlog, then the following DDL can be used to consume messages from this topic and parse change events:
CREATE TABLE topic_products (
-- metadata is exactly the same as MySQL "products" table.
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' -- use canal-json format.
);
After registering a Kafka topic as a Flink table, you can use Canal messages as a changelog source:
-- A real-time materialized view on the MySQL "products" table.
-- Calculate the latest average weight of the same product.
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- Synchronize all data and incremental changes to the MySQL "products" table.
-- Elasticsearch "products" index for future searches.
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;CREATE TABLE topic_products (
--metadata is excatly the same as MySQL "products" table.
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' -- use canal-json format.
);
Configuration Options
| Parameter | Description | Required | Default | Type |
|---|---|---|---|---|
| format | Specify the format to use, use the Canal format, and the value is canal-json. | yes | none | String |
| canal-json.ignore-parse-errors | The parameter values are as follows: true: When parsing exceptions, skip the current field or row. false (default): An error is thrown and the job failed to start. | no | false | Boolean |
| canal-json.timestamp-format.standard | Specifies the input and output timestamp format. The parameter values are as follows: SQL: Parse input timestamps in yyyy-MM-dd HH:mm:ss.s{precision} format, such as 2020-12-30 12:13:14.123, and output timestamps in the same format. ISO-8601: Parse input timestamps in yyyy-MM-ddTHH:mm:ss.s{precision} format, such as 2020-12-30T12:13:14.123, and output timestamps in the same format. | no | SQL | String |
| canal-json.map-null-key.mode | Specifies the method to handle empty key values in the Map. The parameter values are as follows: FAIL: An exception is thrown when the key value in the Map is empty. DROP: Discard the data item whose key value is empty in the Map. LITERAL: Use string constants to replace empty key values in the Map. The value of the string constant is defined by canal-json.map-null-key.literal. | no | false | Boolean |
| canal-json.map-null-key.literal | When the value of canal-json.map-null-key.mode is LITERAL, specify a string constant to replace the null key value in the Map. | no | null | String |
| canal-json.encode.decimal-as-plain-number | The parameter values are as follows: true: All data of DECIMAL type remain as they are, and are not expressed in scientific notation, for example: 0.000000027 is expressed as 0.000000027. false: All data of DECIMAL type are expressed in scientific notation, for example: 0.000000027 is expressed as 2.7E-8. | no | false | Boolean |
| canal-json.database.include | An optional regular expression, by matching the database meta field in the Canal record, only read the changelog records of the specified database. Regex with Java’s Pattern compatible. | no | none | String |
| canal-json.table.include | An optional regular expression, by matching the table meta field in the Canal record, only read the changelog records of the specified table. Regex with Java’s Pattern compatible. | no | null | String |
| canal-json.infer-schema.strategy | The strategy used to infer the schema from Canal messages. Valid values are AUTO (infer from JSON structure), SQL_TYPE (use sqlType field), and MYSQL_TYPE (use mysqlType field). Supported from VERA Engine 4.3. | no | AUTO | String |
| canal-json.mysql.treat-tinyint1-as-boolean.enabled | When canal-json.infer-schema.strategy is set to MYSQL_TYPE, this determines whether to parse TINYINT(1) as a boolean. If true, 1 becomes true and 0 becomes false. If false, it is parsed as an integer. | no | true | Boolean |
| canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled | When canal-json.infer-schema.strategy is set to MYSQL_TYPE, this determines the TIMESTAMP format. If true, it is parsed as a naive timestamp string. If false, the output is timezone-aware (e.g., with a 'Z' suffix). | no | true | Boolean |
Schema Inference
Starting with VERA Engine 4.3, you can configure how the Canal format infers data types from the source messages. This is particularly useful when you need to ensure that numeric or temporal types are parsed correctly from string representations in the JSON data.
Inference Strategies
You can select from the following strategies using the canal-json.infer-schema.strategy option:
- AUTO: The default strategy. It infers types based on the JSON data structure itself.
- SQL_TYPE: Uses the
sqlTypefield present in Canal records to determine the data types. - MYSQL_TYPE: Uses the
mysqlTypefield in Canal records. This strategy supports additional MySQL-specific tuning options.
MySQL Type Parsing
When you use the MYSQL_TYPE strategy, you can further refine how specific MySQL types are handled:
- TINYINT(1) Handling: Use
canal-json.mysql.treat-tinyint1-as-boolean.enabledto decide ifTINYINT(1)columns should be treated as booleans or integers. - TIMESTAMP Handling: Use
canal-json.mysql.treat-mysql-timestamp-as-datetime-enabledto control whether MySQLTIMESTAMPvalues are treated as naive datetime strings or timezone-aware values.
Examples
The following examples demonstrate how different inference strategies and configuration options affect the parsed output.
Comparing Inference Strategies
Consider a Canal message with the following structure:
{
"data": [
{
"id": "111",
"is_active": "1",
"price": "12.34",
"created_ts": "2025-01-01 12:34:56"
}
],
"mysqlType": {
"id": "BIGINT",
"is_active": "TINYINT(1)",
"price": "DECIMAL(10,2)",
"created_ts": "TIMESTAMP"
},
"sqlType": {
"id": -5,
"is_active": 12,
"price": 12,
"created_ts": 12
},
"type": "INSERT"
}
The table below shows how the fields are parsed under different strategies:
| Strategy | id | is_active | price | created_ts | Rationale |
|---|---|---|---|---|---|
| AUTO | "111" (String) | "1" (String) | "12.34" (String) | "2025-01-01 12:34:56" | Infers types from JSON shape. |
| SQL_TYPE | 111 (Number) | "1" (String) | "12.34" (String) | "2025-01-01 12:34:56" | Uses sqlType (e.g., id: -5 is BIGINT). |
| MYSQL_TYPE | 111 (Number) | true (Boolean) | 12.34 (Number) | "2025-01-01 12:34:56" | Uses mysqlType and default MySQL toggles. |
MySQL Timestamp Parsing
When using the MYSQL_TYPE strategy, you can toggle the canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled option:
- true (default):
2025-01-01 12:34:56 - false:
2025-01-01 12:34:56Z(timezone-aware)
Limitations
- Missing Type Metadata: If you set the strategy to MYSQL_TYPE but the
mysqlTypefield is missing from the Canal message, the ingestion job will fail with a deserialization error. Ensure your Canal producer is configured to include these metadata fields, or use the AUTO strategy as a fallback.
Type Mapping
Currently, Canal uses the JSON format for serialization and deserialization. See JSON Format for more details on data type mapping.
Other Instructions for Use
Available Metadata
The following format metadata can be declared as read-only (VIRTUAL) columns in DDL statements.
The format metadata field is only available if the corresponding connector forwards format metadata. Currently, only Kafka Connectors are able to declare metadata fields in their value format.
| Key | Description | Type |
|---|---|---|
| database | Original database. Corresponds to the database field in the Canal record (if available). | STRING NULL |
| table | The tables of the original database. Corresponds to the table field in the Canal record (if available). | STRING NULL |
| sql-type | Mapping of various sql types. Corresponds to the sqlType field in the Canal record (if available). | MAP<STRING, INT> NULL |
| pk-names | Array of primary key names. Corresponds to the pkNames field in the Canal record (if available). | ARRAY<STRING>NULL |
| ingestion-timestamp | The timestamp when the event was processed by the connector. Corresponds to the ts field in the Canal record. | TIMESTAMP_LTZ(3) NULL |
The following example shows how to access Canal metadata fields in Kafka:
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
origin_pk_names ARRAY<DARTA> FROM 'value.pk-names' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'canal-json'
);
Common Problem
Posting Duplicate Change Events on Failure
In a normal operating environment, Canal can deliver each change event with exactly-once semantics, and Flink can normally consume change events generated by Canal. In abnormal situations (for example, a fault occurs), Canal can only guarantee at-least-once delivery semantics. At this time, Canal may deliver duplicate change events to Kafka, and when Flink consumes from Kafka, it will get duplicate events, which may lead to wrong results or unexpected exceptions in the operation of Flink query. Therefore, in this case, it is recommended to set the job parameter table.exec.source.cdc-events-duplicate to true, and define the PRIMARY KEY on the source. The framework will generate an additional stateful operator that uses the PRIMARY KEY to deduplicate change events and generate a normalized changelog stream.