Maxwell
This article introduces you to Maxwell format usage examples, configuration options, and type mappings.
Background information
Maxwell is a CDC (Changelog Data Capture) tool that streams changes from MySQL to Kafka, Kinesis and other streaming connectors in real time. Maxwell provides a unified format structure for changelogs and supports serialization of messages using JSON. Flink supports parsing Maxwell JSON messages into INSERT, UPDATE or DELETE messages into Flink or SQL systems. In many cases, it is very useful to take advantage of this feature, such as:
- Synchronize incremental data from database to other systems
- Log audit
- Real-time materialized view of the database
- Temporal join change history of database tables
Flink also supports encoding INSERT, UPDATE or DELETE messages in Flnk SQL into JSON messages in Maxwell format and outputting them to storage such as Kafka.
Currently Flink does not support combining UPDATE_BEFORE and UPDATE_AFTER into one UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as Maxwell messages of type DELETE and INSERT respectively.
Example of use
Assuming the MySQL products table has 4 columns (id, name, description, weight), a simple example of an update operation captured from the MySQL products table in JSON format is as follows:
{
"database":"test",
"table":"e",
"type":"insert",
"ts":1477053217,
"xid":23396,
"commit":true,
"position":"master. 000006:800911",
"server_id":23042,
"thread_id":108,
"primary_key": [1, "2016-10-21 05:33:37.523000"],
"primary_key_columns": ["id", "c" ],
"data":{
"id":111,
"name":"scooter",
"description":"Big 2-wheel scooter",
"weight":5.15
},
"old":{
"weight":5.18 ,
}
}
Description
For the meaning of each field in the example, see Maxwell. The JSON message above is an update event on the products table where the row with id = 111 has a weight value changed from 5.18 to 5.15. Assuming this message is synced to a Kafka topic called products_binlog, the following DDL can be used to consume this topic and parse change events.
CREATE TABLE topic_products (
-- metadata is exactly the same as MySQL "products" table.
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog ',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json'
)
After registering a topic as a Flink table, Maxwell messages can be used as a changelog source.
-- A real-time materialized view on the MySQL "products" table.
-- Calculate the latest average weight of the same product.
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- Synchronize all data and incremental changes to the MySQL "products" table.
-- Elasticsearch "products" index for future searches.
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;
Configuration options
Parameter | Required | Default | Type of data | Description |
---|---|---|---|---|
format | yes | (none) | String | Specify the format to use, here it should be maxwell-json. |
maxwell-json.ignore-parse-errors | no | false | Boolean | The parameter values are as follows: true: When parsing exceptions, skip the current field or row. false (default): An error is thrown and the job failed to start. |
maxwell-json.timestamp-format.standard | no | SQL | String | Specifies the input and output timestamp format. The parameter values are as follows: SQL: Parse input timestamps in yyyy-MM-dd HH:mm:ss.s{precision} format, such as 2020-12-30 12:13:14.123, and output timestamps in the same format. ISO-8601: Parse input timestamps in yyyy-MM-ddTHH:mm:ss.s{precision} format, such as 2020-12-30T12:13:14.123, and output timestamps in the same format. |
maxwell-json.map-null-key.mode | no | FAIL | String | Specifies the method to handle empty key values in the Map. The parameter values are as follows: FAIL: An exception is thrown when the key value in the Map is empty. DROP: Discard the data item whose key value is empty in the Map. LITERAL: Use string constants to replace empty key values in the Map. The value of the string constant is defined by canal-json.map-null-key.literal. |
maxwell-json.map-null-key.literal | no | null | String | When the value of maxwell-json.map-null-key.mode is LITERAL, specify a string constant to replace the null key value in the Map. |
maxwell-json.encode.decimal-as-plain-number | no | false | Boolean | The parameter values are as follows: true: All data of DECIMAL type remain as they are, and are not expressed in scientific notation, for example: 0.000000027 is expressed as 0.000000027. false: All data of DECIMAL type are expressed in scientific notation, for example: 0.000000027 is expressed as 2.7E-8. |
Type mapping
Currently, Maxwell uses the JSON format for serialization and deserialization. For more details on data type mapping, please refer to JSON Format.
Other instructions for use
Available metadata
The following format metadata can be declared as read-only (VIRTUAL) columns in DDL statements.
The format metadata field is only available if the corresponding connector forwards format metadata. Currently, only Kafka Connectors are able to declare metadata fields in their value format.
Key | Type of data | Description |
---|---|---|
database | STRING NULL | Original database. Corresponds to the database field in the Maxwell record (if available). |
table | STRING NULL | The tables of the original database. Corresponds to the table field in the Maxwell record (if available). |
primary-key-columns | ARRAY<STRING>NULL | Array of primary key names. Corresponds to the primary_key_columns field in the Maxwell record (if available). |
ingestion-timestamp | TIMESTAMP_LTZ(3) NULL | The timestamp when the event was processed by the connector. Corresponds to the ts field in the Maxwell record. |
The following example shows how to access Maxwell metadata fields in Kafka:
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
originSTAMP(3) FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092' ,
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'maxwell-json'
);
Common problem
Posting duplicate change events on failure
In a normal operating environment, Maxwell can deliver each change event with exactly-once semantics, and Flink can normally consume the change events generated by Maxwell. Under abnormal conditions (such as failures), Maxwell can only guarantee at-least-once delivery semantics. At this time, Maxwell may deliver duplicate change events to Kafka, and when Flink consumes from Kafka, it will get duplicate events, which may lead to wrong results or unexpected exceptions in the operation of Flink query. Therefore, in this case, it is recommended to set the job parameter table.exec.source.cdc-events-duplicate to true, and define the PRIMARY KEY on the source. The framework will generate an additional stateful operator that uses the PRIMARY KEY to deduplicate change events and generate a normalized changelog stream.
This page is derived from the official Apache Flink® documentation.
Refer to the Credits page for more information.