Skip to main content

Debezium Json

This article introduces you to usage examples, configuration options, and typemaps of the Debezium-json format.

Background information

Debezium is a CDC (Changelog Data Capture) tool that streams changes from MySQL, PostgreSQL, Oracle, Microsoft SQL Server, and many other databases into Kafka in real time. Debezium provides a unified format structure for changelogs and supports serialization of messages using JSON.

Flink supports parsing Debezium JSON messages into INSERT, UPDATE or DELETE messages into the Flink SQL system. In many cases, it is very useful to take advantage of this feature, for example:

  • Synchronize incremental data from database to other systems
  • Log audit
  • Real-time materialized view of the database
  • Temporal join change history of database tables

Flink also supports encoding INSERT, UPDATE or DELETE messages in Flink SQL into JSON messages in Debezium format, and output them to storage such as Kafka.

note

Currently Flink does not support combining UPDATE_BEFORE and UPDATE_AFTER into one UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER as Debezium messages of type DELETE and INSERT respectively.

Example of use

Assuming the MySQL products table has 4 columns (id, name, description, weight), a simple example of an update operation captured from the MySQL products table in JSON format is as follows:

    {
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904 ,
"transaction": null
};
note

For the meaning of each field in the example, see Debezium for details.

The JSON message above is an update event on the products table where the row with id = 111 has a weight value changed from 5.18 to 5.15. Assuming this message is synced to a Kafka topic called products_binlog, the following DDL can be used to consume this topic and parse change events.

    -- Use 'debezium-json' format to parse Debezium's JSON message
CREATE TABLE topic_products (
-- schema is exactly the same as MySQL's products table
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector ' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
-- use 'debezium-json' format to Parse Debezium's JSON messages
-- use 'debezium-avro-confluent' if Debezium encodes messages with Avro
);

In some cases, when setting up Debezium Kafka Connect, the Kafka configuration value.converter.schemas.enable may be enabled to include schema information in the message body. A Debezium JSON message might look like this:

    {
"schema": {...},
"payload": {
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight" : 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {... },
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
}

'debezium-json.schema-include' = 'true' (default is false) to the above DDL WITH clause . In general, it is not recommended to include a schema description, as this can make the message very verbose and reduce parsing performance.

After registering a topic as a Flink table, Debezium messages can be used as a changelog source.

    -- A real-time materialized view of MySQL "products".
-- Calculate the latest average weight of the same product.
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- Synchronize all data and incremental changes to the MySQL "products" table.
-- Elasticsearch "products" index for future lookups.
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

Configuration options

Flink provides debezium-json to parse messages in JSON format generated by Debezium.

debezium-json

Use debezium-json to parse Debezium JSON messages.

ParameterRequiredDefaultTypeDescription
formatyes(none)StringSpecifies the format to be used. When parsing Debezium JSON messages, the parameter value is debezium-json.
debezium-json.schema-includenofalseBooleanWhen setting up Debezium Kafka Connect, you can enable the Kafka configuration value.converter.schemas.enable to include schemas in messages. This option indicates whether Debezium JSON messages contain schema. The reference values are as follows: true: Debezium JSON messages contain schema. false: Debezium JSON message does not contain schema.
debezium-json.ignore-parse-errorsnofalseBooleanThe parameter values are as follows: true: When parsing exceptions, skip the current field or row. false (default): An error is reported and the job failed to start.
debezium-json.timestamp-format.standardnoSQLStringSpecifies the input and output timestamp format. The parameter values are as follows: SQL: Parse input timestamps in yyyy-MM-dd HH:mm:ss.s{precision} format, such as 2020-12-30 12:13:14.123, and output timestamps in the same format. ISO-8601: Parse input timestamps in yyyy-MM-ddTHH:mm:ss.s{precision} format, such as 2020-12-30T12:13:14.123, and output timestamps in the same format.
debezium-json.map-null-key.modenoFAILStringSpecifies the method to handle empty key values in the Map. The parameter values are as follows: FAIL: An exception is thrown when the key value in the Map is empty. DROP: Discard the data item whose key value is empty in the Map. LITERAL: Use string constants to replace empty key values in the Map. The value of the string constant is defined by canal-json.map-null-key.literal.
debezium-json.map-null-key.literalnonullStringWhen the value of debezium-json.map-null-key.mode is LITERAL, specify a string constant to replace the null key value in the Map.
debezium-json.encode.decimal-as-plain-numbernofalseBooleanThe parameter values are as follows: true: All data of DECIMAL type remain as they are, and are not expressed in scientific notation, for example: 0.000000027 is expressed as 0.000000027. false: All data of DECIMAL type are expressed in scientific notation, for example, 0.000000027 is expressed as 2.7E-8.

Type mapping

Currently, Debezium uses JSON format for serialization and deserialization. For more details on data type mapping, please refer to the JSON Format documentation and the Confluent Avro Format documentation.

Other instructions for use

Available metadata

The following format metadata can be declared as read-only (VIRTUAL) columns in DDL statements.

note

The format metadata field is only available if the corresponding connector forwards format metadata. Currently, only Kafka Connectors are able to declare metadata fields in their value format.

KeyTypeDescription
schemaSTRING NULLA JSON string describing the payload schema. Null if the schema is not contained in the Debezium record.
ingestion-timestampTIMESTAMP_LTZ(3) NULLThe timestamp when the event was processed by the connector. Corresponds to the ts_ms field in the Debezium record.
source.timestampTIMESTAMP_LTZ(3) NULLTimestamp when the source system created the event. Corresponds to the source.ts_ts field in the Debezium record.
source.databaseSTRING NULLoriginal database. Corresponds to the source.db field in the Debezium record (if available).
source.schemaSTRING NULLThe schema of the original database. Corresponds to the source.schema field in the Debezium record (if available).
source.tableSTRING NULLThe tables of the original database. Corresponds to the source.table or source.collection field (if available) in the Debezium record.
source.propertiesMAP <STRING, STRING> NULLA map of various source properties. Corresponds to the source field in the Debezium record.

The following example shows how to access Debezium metadata fields in Kafka:


CREATE TABLE KafkaTable (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup .mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);

Common problem

Posting duplicate change events on failure

In a normal operating environment, Debezium can deliver each change event with exactly-once semantics, and Flink can normally consume the change events generated by Debezium. In abnormal situations (such as failures), Debezium can only guarantee at-least-once delivery semantics. At this time, Debezium may deliver duplicate change events to Kafka, and when Flink consumes from Kafka, it will get duplicate events, which may lead to wrong results or unexpected exceptions in the operation of Flink query. Therefore, in this case, it is recommended to set the job parameter table.exec.source.cdc-events-duplicate to true and define the PRIMARY KEY on the source table. The Flink system will generate an additional stateful operator that uses the PRIMARY KEY to deduplicate change events and generate a normalized changelog stream.

Data produced by Debezium Postgres connector cannot be parsed correctly

If you are using Debezium PostgreSQL Connector to capture changes to Kafka, make sure that the REPLICA IDENTITY of the monitored table has been configured as FULL, and the default value is DEFAULT. Otherwise, Flink SQL will not be able to parse Debezium data correctly.

When configured to FULL, update and delete events will fully contain the previous values of all columns. When configured for other, the before field of update and delete events will only contain the value of the PRIMARY KEY field, or be null (no PRIMARY KEY). You can change the configuration of REPLICA IDENTITY by running ALTER TABLE (your-table-name) REPLICA IDENTITY FULL.

note

This page is derived from the official Apache Flink® documentation.

Refer to the Credits page for more information.