Canal
On this page
This article introduces you to the usage examples, configuration options, and type mappings of the Canal format.
Background Information
Canal is a CDC (ChangeLog Data Capture) tool that can transmit MySQL changes to other systems in real time. Canal provides a unified data format for changelogs and supports the use of JSON or protobuf (https://protobuf.dev/) serialized messages (Canal uses protobuf by default).
Flink supports parsing Canal's JSON messages into INSERT, UPDATE, or DELETE messages into the Flink SQL system. In many cases, it is very useful to take advantage of this feature, such as:
- Synchronize incremental data from database to other systems
- Log audit
- Real-time materialized view of the database
- Temporal join change history of database tables.
Flink also supports encoding INSERT, UPDATE, or DELETE messages in Flink SQL into JSON messages in Canal format and outputting them to storage such as Kafka.
Currently Flink does not support combining UPDATE_BEFORE and UPDATE_AFTER into one UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER into Canal messages of type DELETE and INSERT respectively.
Example of Use
Canal provides a unified format for the change log. The following is a simple example of capturing update operations from the MySQL library products table:
1 {
2 "data": [
3 {
4 "id": "111",
5 "name": "scooter",
6 "description": "Big 2-wheel scooter",
7 "weight": "5.18"
8 }
9 ],
10 "database": "inventory",
11 "es": 1589373560000,
12 "id": 9,
13 "isDdl": false,
14 "mysqlType": {
15 "id": "INTEGER",
16 "name": "VARCHAR(255)",
17 "description": "VARCHAR(512)",
18 "weight": "FLOAT"
19 },
20 "old": [
21 {
22 "weight": "5.15"
23 }
24 ],
25 "pkNames": [
26 "id"
27 ],
28 "sql": "",
29 "sqlType": {
30 "id": 4,
31 "name": 12,
32 "description": 12,
33 "weight": 7
34 },
35 "table": "products",
36 "ts": 1589373560798,
37 "type": "UPDATE"
38 }Description:for the meaning of each field, see Canal documentation.
The MySQL products table has 4 columns (id, name, description, and weight). The above JSON message is an update event on the products table, indicating that the value of the weight field on the row data with id = 111 has changed from 5.15 to 5.18. Assuming that the messages have been synchronized to a Kafka topic named products_binlog, then the following DDL can be used to consume messages from this topic and parse change events:
1 CREATE TABLE topic_products (
2 -- metadata is exactly the same as MySQL "products" table.
3 id BIGINT,
4 name STRING,
5 description STRING,
6 weight DECIMAL(10, 2)
7 ) WITH (
8 'connector' = 'kafka',
9 'topic' = 'products_binlog',
10 'properties.bootstrap.servers' = 'localhost:9092',
11 'properties.group.id' = 'testGroup',
12 'format' = 'canal-json' -- use canal-json format.
13 );After registering a Kafka topic as a Flink table, you can use Canal messages as a changelog source:
1 -- A real-time materialized view on the MySQL "products" table.
2 -- Calculate the latest average weight of the same product.
3 SELECT name, AVG(weight) FROM topic_products GROUP BY name;
4 -- Synchronize all data and incremental changes to the MySQL "products" table.
5 -- Elasticsearch "products" index for future searches.
6 INSERT INTO elasticsearch_products
7 SELECT * FROM topic_products;CREATE TABLE topic_products (
8 --metadata is excatly the same as MySQL "products" table.
9 id BIGINT,
10 name STRING,
11 description STRING,
12 weight DECIMAL(10, 2)
13 ) WITH (
14 'connector' = 'kafka',
15 'topic' = 'products_binlog',
16 'properties.bootstrap.servers' = 'localhost:9092',
17 'properties.group.id' = 'testGroup',
18 'format' = 'canal-json' -- use canal-json format.
19 );Configuration Options
Type Mapping
Currently, Canal uses the JSON format for serialization and deserialization. See JSON Format for more details on data type mapping.
Other Instructions for Use
Available Metadata
The following format metadata can be declared as read-only (VIRTUAL) columns in DDL statements.
The format metadata field is only available if the corresponding connector forwards format metadata. Currently, only Kafka Connectors are able to declare metadata fields in their value format.
The following example shows how to access Canal metadata fields in Kafka:
1 CREATE TABLE KafkaTable (
2 origin_database STRING METADATA FROM 'value.database' VIRTUAL,
3 origin_table STRING METADATA FROM 'value.table' VIRTUAL,
4 origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
5 origin_pk_names ARRAY<DARTA> FROM 'value.pk-names' VIRTUAL,
6 origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
7 user_id BIGINT,
8 item_id BIGINT,
9 behavior STRING
10 ) WITH (
11 'connector' = 'kafka',
12 'topic' = 'user_behavior',
13 'properties.bootstrap.servers' = 'localhost:9092',
14 'properties.group.id' = 'testGroup',
15 'scan.startup.mode' = 'earliest-offset',
16 'value.format' = 'canal-json'
17 );Common Problem
Posting Duplicate Change Events on Failure
In a normal operating environment, Canal can deliver each change event with exactly-once semantics, and Flink can normally consume change events generated by Canal. In abnormal situations (for example, a fault occurs), Canal can only guarantee at-least-once delivery semantics. At this time, Canal may deliver duplicate change events to Kafka, and when Flink consumes from Kafka, it will get duplicate events, which may lead to wrong results or unexpected exceptions in the operation of Flink query. Therefore, in this case, it is recommended to set the job parameter table.exec.source.cdc-events-duplicate to true, and define the PRIMARY KEY on the source. The framework will generate an additional stateful operator that uses the PRIMARY KEY to deduplicate change events and generate a normalized changelog stream.