Skip to main content

Maxwell

This article introduces you to Maxwell format usage examples, configuration options, and type mappings.

Background Information

Maxwell is a CDC (Changelog Data Capture) tool that streams changes from MySQL to Kafka, Kinesis and other streaming connectors in real time. Maxwell provides a unified format structure for changelogs and supports serialization of messages using JSON. Flink supports parsing Maxwell JSON messages into INSERT, UPDATE or DELETE messages into Flink or SQL systems. In many cases, it is very useful to take advantage of this feature, such as:

  • Synchronize incremental data from database to other systems
  • Log audit
  • Real-time materialized view of the database
  • Temporal join change history of database tables

Flink also supports encoding INSERT, UPDATE or DELETE messages in Flnk SQL into JSON messages in Maxwell format and outputting them to storage such as Kafka.

note

Currently Flink does not support combining UPDATE_BEFORE and UPDATE_AFTER into one UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as Maxwell messages of type DELETE and INSERT respectively.

Example of Use

Assuming the MySQL products table has 4 columns (id, name, description, weight), a simple example of an update operation captured from the MySQL products table in JSON format is as follows:

    {
"database":"test",
"table":"e",
"type":"insert",
"ts":1477053217,
"xid":23396,
"commit":true,
"position":"master. 000006:800911",
"server_id":23042,
"thread_id":108,
"primary_key": [1, "2016-10-21 05:33:37.523000"],
"primary_key_columns": ["id", "c" ],
"data":{
"id":111,
"name":"scooter",
"description":"Big 2-wheel scooter",
"weight":5.15
},
"old":{
"weight":5.18 ,
}
}

Description

For the meaning of each field in the example, see Maxwell. The JSON message above is an update event on the products table where the row with id = 111 has a weight value changed from 5.18 to 5.15. Assuming this message is synced to a Kafka topic called products_binlog, the following DDL can be used to consume this topic and parse change events.

    CREATE TABLE topic_products (
-- metadata is exactly the same as MySQL "products" table.
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog ',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json'
)

After registering a topic as a Flink table, Maxwell messages can be used as a changelog source.

    -- A real-time materialized view on the MySQL "products" table.
-- Calculate the latest average weight of the same product.
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- Synchronize all data and incremental changes to the MySQL "products" table.
-- Elasticsearch "products" index for future searches.
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

Configuration Options

ParameterRequiredDefaultType of dataDescription
formatyes(none)StringSpecify the format to use, here it should be maxwell-json.
maxwell-json.ignore-parse-errorsnofalseBooleanThe parameter values are as follows: true: When parsing exceptions, skip the current field or row. false (default): An error is thrown and the job failed to start.
maxwell-json.timestamp-format.standardnoSQLStringSpecifies the input and output timestamp format. The parameter values are as follows: SQL: Parse input timestamps in yyyy-MM-dd HH:mm:ss.s{precision} format, such as 2020-12-30 12:13:14.123, and output timestamps in the same format. ISO-8601: Parse input timestamps in yyyy-MM-ddTHH:mm:ss.s{precision} format, such as 2020-12-30T12:13:14.123, and output timestamps in the same format.
maxwell-json.map-null-key.modenoFAILStringSpecifies the method to handle empty key values in the Map. The parameter values are as follows: FAIL: An exception is thrown when the key value in the Map is empty. DROP: Discard the data item whose key value is empty in the Map. LITERAL: Use string constants to replace empty key values in the Map. The value of the string constant is defined by canal-json.map-null-key.literal.
maxwell-json.map-null-key.literalnonullStringWhen the value of maxwell-json.map-null-key.mode is LITERAL, specify a string constant to replace the null key value in the Map.
maxwell-json.encode.decimal-as-plain-numbernofalseBooleanThe parameter values are as follows: true: All data of DECIMAL type remain as they are, and are not expressed in scientific notation, for example: 0.000000027 is expressed as 0.000000027. false: All data of DECIMAL type are expressed in scientific notation, for example: 0.000000027 is expressed as 2.7E-8.

Type Mapping

Currently, Maxwell uses the JSON format for serialization and deserialization. For more details on data type mapping, please refer to JSON Format.

Other Instructions for Use

Available Metadata

The following format metadata can be declared as read-only (VIRTUAL) columns in DDL statements.

note

The format metadata field is only available if the corresponding connector forwards format metadata. Currently, only Kafka Connectors are able to declare metadata fields in their value format.

KeyType of dataDescription
databaseSTRING NULLOriginal database. Corresponds to the database field in the Maxwell record (if available).
tableSTRING NULLThe tables of the original database. Corresponds to the table field in the Maxwell record (if available).
primary-key-columnsARRAY<STRING>NULLArray of primary key names. Corresponds to the primary_key_columns field in the Maxwell record (if available).
ingestion-timestampTIMESTAMP_LTZ(3) NULLThe timestamp when the event was processed by the connector. Corresponds to the ts field in the Maxwell record.

The following example shows how to access Maxwell metadata fields in Kafka:

    CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
originSTAMP(3) FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092' ,
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'maxwell-json'
);

Common Problem

Posting Duplicate Change Events on Failure

In a normal operating environment, Maxwell can deliver each change event with exactly-once semantics, and Flink can normally consume the change events generated by Maxwell. Under abnormal conditions (such as failures), Maxwell can only guarantee at-least-once delivery semantics. At this time, Maxwell may deliver duplicate change events to Kafka, and when Flink consumes from Kafka, it will get duplicate events, which may lead to wrong results or unexpected exceptions in the operation of Flink query. Therefore, in this case, it is recommended to set the job parameter table.exec.source.cdc-events-duplicate to true, and define the PRIMARY KEY on the source. The framework will generate an additional stateful operator that uses the PRIMARY KEY to deduplicate change events and generate a normalized changelog stream.

note

This page is derived from the official Apache Flink® documentation.

Refer to the Credits page for more information.