Debezium Avro Confluent
On this page
This article introduces configuration options, and typemaps of the Debezium-avro-confluent format.
Background Information
Debezium is a CDC (Changelog Data Capture) tool that streams changes from MySQL, PostgreSQL, Oracle, Microsoft SQL Server, and many other databases into Kafka in real time. Debezium provides a unified format structure for changelogs and supports serialization of messages using JSON and Apache Avro.
Flink supports parsing Debezium Avro messages into INSERT, UPDATE or DELETE messages into the Flink SQL system. In many cases, it is very useful to take advantage of this feature, for example:
- Synchronize incremental data from database to other systems
- Log audit
- Real-time materialized view of the database
- Temporal join change history of database tables
Flink also supports encoding INSERT, UPDATE or DELETE messages in Flink SQL into or Avro messages in Debezium format, and output them to storage such as Kafka.
Currently Flink does not support combining UPDATE_BEFORE and UPDATE_AFTER into one UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER as Debezium messages of type DELETE and INSERT respectively.
Configuration Options
Flink provides debezium-avro-confluent to parse messages in Avro format generated by Debezium.
debezium-avro-confluent
Use debezium-avro-confluent to parse Debezium Avro messages.
debezium-json
Use debezium-json to parse Debezium JSON messages.
Other Instructions
Available Metadata
The following format metadata can be declared as read-only (VIRTUAL) columns in DDL statements.
The format metadata field is only available if the corresponding connector forwards format metadata. Currently, only Kafka Connectors are able to declare metadata fields in their value format.
The following example shows how to access Debezium metadata fields in Kafka:
1 CREATE TABLE KafkaTable (
2 origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
3 event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
4 origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
5 origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
6 origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
7 origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
8 user_id BIGINT,
9 item_id BIGINT,
10 behavior STRING
11 ) WITH (
12 'connector' = 'kafka',
13 'topic' = 'user_behavior',
14 'properties.bootstrap.servers' = 'localhost:9092',
15 'properties.group.id' = 'testGroup',
16 'scan.startup .mode' = 'earliest-offset',
17 'value.format' = 'debezium-json'
18 );Common Problem
Posting Duplicate Change Events on Failure
In a normal operating environment, Debezium can deliver each change event with exactly-once semantics, and Flink can normally consume the change events generated by Debezium. In abnormal situations (such as failures), Debezium can only guarantee at-least-once delivery semantics. At this time, Debezium may deliver duplicate change events to Kafka, and when Flink consumes from Kafka, it will get duplicate events, which may lead to wrong results or unexpected exceptions in the operation of Flink query. Therefore, in this case, it is recommended to set the job parameter table.exec.source.cdc-events-duplicate to true and define the PRIMARY KEY on the source table. The Flink system will generate an additional stateful operator that uses the PRIMARY KEY to deduplicate change events and generate a normalized changelog stream.
See Debezium for more information on Debezium's message delivery semantics.
Data Produced by Debezium Postgres Connector cannot Be Parsed Correctly
If you are using Debezium PostgreSQL Connector to capture changes to Kafka, make sure that the REPLICA IDENTITY of the monitored table has been configured as FULL, and the default value is DEFAULT. Otherwise, Flink SQL will not be able to parse Debezium data correctly.
When configured to FULL, update and delete events will fully contain the previous values of all columns. When configured for other, the before field of update and delete events will only contain the value of the PRIMARY KEY field, or be null (no PRIMARY KEY). You can change the configuration of REPLICA IDENTITY by running ALTER TABLE (your-table-name) REPLICA IDENTITY FULL .