Skip to main content

Canal

This article introduces you to the usage examples, configuration options, and type mappings of the Canal format.

Background information

Canal is a CDC (ChangeLog Data Capture) tool that can transmit MySQL changes to other systems in real time. Canal provides a unified data format for changelogs and supports the use of JSON or protobuf serialized messages (Canal uses protobuf by default).

Flink supports parsing Canal's JSON messages into INSERT, UPDATE, or DELETE messages into the Flink SQL system. In many cases, it is very useful to take advantage of this feature, such as:

  • Synchronize incremental data from database to other systems
  • Log audit
  • Real-time materialized view of the database
  • Temporal join change history of database tables.

Flink also supports encoding INSERT, UPDATE, or DELETE messages in Flink SQL into JSON messages in Canal format and outputting them to storage such as Kafka.

caution

Currently Flink does not support combining UPDATE_BEFORE and UPDATE_AFTER into one UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER into Canal messages of type DELETE and INSERT respectively.

Example of use

Canal provides a unified format for the change log. The following is a simple example of capturing update operations from the MySQL library products table:

   {
"data": [
{
"id": "111",
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": "5.18"
}
],
"database": "inventory",
"es": 1589373560000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "INTEGER",
"name": "VARCHAR(255)",
"description": "VARCHAR(512)",
"weight": "FLOAT"
},
"old": [
{
"weight": "5.15"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"description": 12,
"weight": 7
},
"table": "products",
"ts": 1589373560798,
"type": "UPDATE"
}

Description: for the meaning of each field, see Canal documentation.

The MySQL products table has 4 columns (id, name, description, and weight). The above JSON message is an update event on the products table, indicating that the value of the weight field on the row data with id = 111 has changed from 5.15 to 5.18. Assuming that the messages have been synchronized to a Kafka topic named products_binlog, then the following DDL can be used to consume messages from this topic and parse change events:

   CREATE TABLE topic_products (
-- metadata is exactly the same as MySQL "products" table.
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' -- use canal-json format.
);

After registering a Kafka topic as a Flink table, you can use Canal messages as a changelog source:

   -- A real-time materialized view on the MySQL "products" table.
-- Calculate the latest average weight of the same product.
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- Synchronize all data and incremental changes to the MySQL "products" table.
-- Elasticsearch "products" index for future searches.
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;CREATE TABLE topic_products (
--metadata is excatly the same as MySQL "products" table.
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' -- use canal-json format.
);

Configuration options

ParameterDescriptionRequiredDefaultType
formatSpecify the format to use, use the Canal format, and the value is canal-json.yesnoneString
canal-json.ignore-parse-errorsThe parameter values are as follows: true: When parsing exceptions, skip the current field or row. false (default): An error is thrown and the job failed to start.nofalseBoolean
canal-json.timestamp-format.standardSpecifies the input and output timestamp format. The parameter values are as follows: SQL: Parse input timestamps in yyyy-MM-dd HH:mm:ss.s{precision} format, such as 2020-12-30 12:13:14.123, and output timestamps in the same format. ISO-8601: Parse input timestamps in yyyy-MM-ddTHH:mm:ss.s{precision} format, such as 2020-12-30T12:13:14.123, and output timestamps in the same format.noSQLString
canal-json.map-null-key.modeSpecifies the method to handle empty key values in the Map. The parameter values are as follows: FAIL: An exception is thrown when the key value in the Map is empty. DROP: Discard the data item whose key value is empty in the Map. LITERAL: Use string constants to replace empty key values in the Map. The value of the string constant is defined by canal-json.map-null-key.literal.nofalseBoolean
canal-json.map-null-key.literalWhen the value of canal-json.map-null-key.mode is LITERAL, specify a string constant to replace the null key value in the Map.nonullString
canal-json.encode.decimal-as-plain-numberThe parameter values are as follows: true: All data of DECIMAL type remain as they are, and are not expressed in scientific notation, for example: 0.000000027 is expressed as 0.000000027. false: All data of DECIMAL type are expressed in scientific notation, for example: 0.000000027 is expressed as 2.7E-8.nofalseBoolean
canal-json.database.includeAn optional regular expression, by matching the database meta field in the Canal record, only read the changelog records of the specified database. Regex with Java’s Pattern compatible.nononeString
canal-json.table.includeAn optional regular expression, by matching the table meta field in the Canal record, only read the changelog records of the specified table. Regex with Java’s Pattern compatible.nonullString

Type mapping

Currently, Canal uses the JSON format for serialization and deserialization. See JSON Format for more details on data type mapping.

Other instructions for use

Available metadata

The following format metadata can be declared as read-only (VIRTUAL) columns in DDL statements.

caution

The format metadata field is only available if the corresponding connector forwards format metadata. Currently, only Kafka Connectors are able to declare metadata fields in their value format.

KeyDescriptionType
databaseOriginal database. Corresponds to the database field in the Canal record (if available).STRING NULL
tableThe tables of the original database. Corresponds to the table field in the Canal record (if available).STRING NULL
sql-typeMapping of various sql types. Corresponds to the sqlType field in the Canal record (if available).MAP<STRING, INT> NULL
pk-namesArray of primary key names. Corresponds to the pkNames field in the Canal record (if available).ARRAY<STRING>NULL
ingestion-timestampThe timestamp when the event was processed by the connector. Corresponds to the ts field in the Canal record.TIMESTAMP_LTZ(3) NULL

The following example shows how to access Canal metadata fields in Kafka:

    CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
origin_pk_names ARRAY<DARTA> FROM 'value.pk-names' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'canal-json'
);

Common problem

Posting duplicate change events on failure

In a normal operating environment, Canal can deliver each change event with exactly-once semantics, and Flink can normally consume change events generated by Canal. In abnormal situations (for example, a fault occurs), Canal can only guarantee at-least-once delivery semantics. At this time, Canal may deliver duplicate change events to Kafka, and when Flink consumes from Kafka, it will get duplicate events, which may lead to wrong results or unexpected exceptions in the operation of Flink query. Therefore, in this case, it is recommended to set the job parameter table.exec.source.cdc-events-duplicate to true, and define the PRIMARY KEY on the source. The framework will generate an additional stateful operator that uses the PRIMARY KEY to deduplicate change events and generate a normalized changelog stream.

note

This page is derived from the official Apache Flink® documentation.

Refer to the Credits page for more information.