Debezium Avro Confluent
This article introduces configuration options, and typemaps of the Debezium-avro-confluent format.
Background information
Debezium is a CDC (Changelog Data Capture) tool that streams changes from MySQL, PostgreSQL, Oracle, Microsoft SQL Server, and many other databases into Kafka in real time. Debezium provides a unified format structure for changelogs and supports serialization of messages using JSON and Apache Avro.
Flink supports parsing Debezium Avro messages into INSERT, UPDATE or DELETE messages into the Flink SQL system. In many cases, it is very useful to take advantage of this feature, for example:
- Synchronize incremental data from database to other systems
- Log audit
- Real-time materialized view of the database
- Temporal join change history of database tables
Flink also supports encoding INSERT, UPDATE or DELETE messages in Flink SQL into or Avro messages in Debezium format, and output them to storage such as Kafka.
Currently Flink does not support combining UPDATE_BEFORE and UPDATE_AFTER into one UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER as Debezium messages of type DELETE and INSERT respectively.
Configuration options
Flink provides debezium-avro-confluent to parse messages in Avro format generated by Debezium.
debezium-avro-confluent
Use debezium-avro-confluent to parse Debezium Avro messages.
Parameter | Required | Default | Type | Description |
---|---|---|---|---|
format | yes | (none) | String | Specifies the format to be used. When parsing Debezium Avro messages, the parameter value is debezium-avro-confluent. |
debezium-avro-confluent.basic-auth.credentials-source | no | (none) | String | A source of basic authentication credentials for Schema Registry. |
debezium-avro-confluent.basic-auth.user-info | no | (none) | String | Basic authentication user information for Schema Registry. |
debezium-avro-confluent.bearer-auth.credentials-source | no | (none) | String | The source of the Schema Registry’s bearer authentication credentials. |
debezium-avro-confluent.bearer-auth.token | no | (none) | String | The bearer authentication token for Schema Registry. |
debezium-avro-confluent.properties | no | (none) | map | Attribute mapping, which is forwarded to Schema Registry. Useful for options that are not officially exposed via Flink configuration options. Important! Flink options have higher priority. |
debezium-avro-confluent.ssl.keystore.location | no | (none) | String | The location of the SSL keystore. |
debezium-avro-confluent.ssl.keystore.password | no | (none) | String | Password for the SSL keystore. |
debezium-avro-confluent.ssl.truststore.location | no | (none) | String | The location of the SSL truststore. |
debezium-avro-confluent.ssl.truststore.password | no | (none) | String | Password for the SSL truststore. |
debezium-avro-confluent.subject | no | (none) | String | The Confluent Schema Registry subject under which to register the schemas used by this format during serialization. By default, Topname -value or topname -key is used as the default subject name if using the kafka and upsert-kafka connectors as the value or key format. For the filesystem connector, when it is used as a sink, the subject option must be used. |
debezium-avro-confluent.url | yes | (none) | String | URL to the Confluent Schema Registry for fetching or registering schemas. |
debezium-json
Use debezium-json to parse Debezium JSON messages.
Parameter | Required | Default | Type | Description |
---|---|---|---|---|
format | yes | (none) | String | Specifies the format to be used. When parsing Debezium JSON messages, the parameter value is debezium-json. |
debezium-json.schema-include | no | false | Boolean | When setting up Debezium Kafka Connect, you can enable the Kafka configuration value.converter.schemas.enable to include schemas in messages. This option indicates whether Debezium JSON messages contain schema. The reference values are as follows: true: Debezium JSON messages contain schema. false: Debezium JSON message does not contain schema. |
debezium-json.ignore-parse-errors | no | false | Boolean | The parameter values are as follows: true: When parsing exceptions, skip the current field or row. false (default): An error is reported and the job failed to start. |
debezium-json.timestamp-format.standard | no | SQL | String | Specifies the input and output timestamp format. The parameter values are as follows: SQL: Parse input timestamps in yyyy-MM-dd HH:mm:ss.s{precision} format, such as 2020-12-30 12:13:14.123, and output timestamps in the same format. ISO-8601: Parse input timestamps in yyyy-MM-ddTHH:mm:ss.s{precision} format, such as 2020-12-30T12:13:14.123, and output timestamps in the same format. |
debezium-json.map-null-key.mode | no | FAIL | String | Specifies the method to handle empty key values in the Map. The parameter values are as follows: FAIL: An exception is thrown when the key value in the Map is empty. DROP: Discard the data item whose key value is empty in the Map. LITERAL: Use string constants to replace empty key values in the Map. The value of the string constant is defined by canal-json.map-null-key.literal. |
debezium-json.map-null-key.literal | no | null | String | When the value of debezium-json.map-null-key.mode is LITERAL, specify a string constant to replace the null key value in the Map. |
debezium-json.encode.decimal-as-plain-number | no | false | Boolean | The parameter values are as follows: true: All data of DECIMAL type remain as they are, and are not expressed in scientific notation, for example: 0.000000027 is expressed as 0.000000027. false: All data of DECIMAL type are expressed in scientific notation, for example, 0.000000027 is expressed as 2.7E-8. |
Other Instructions
Available metadata
The following format metadata can be declared as read-only (VIRTUAL) columns in DDL statements.
The format metadata field is only available if the corresponding connector forwards format metadata. Currently, only Kafka Connectors are able to declare metadata fields in their value format.
Key | Type | Description |
---|---|---|
schema | STRING NULL | A JSON string describing the payload schema. Null if the schema is not contained in the Debezium record. |
ingestion-timestamp | TIMESTAMP_LTZ(3) NULL | The timestamp when the event was processed by the connector. Corresponds to the ts_ms field in the Debezium record. |
source.timestamp | TIMESTAMP_LTZ(3) NULL | Timestamp when the source system created the event. Corresponds to the source.ts_ts field in the Debezium record. |
source. database | STRING NULL | original database. Corresponds to the source.db field in the Debezium record (if available). |
source.schema | STRING NULL | The schema of the original database. Corresponds to the source.schema field in the Debezium record (if available). |
source.table | STRING NULL | The tables of the original database. Corresponds to the source.table or source.collection field (if available) in the Debezium record. |
source.properties | MAP <STRING, STRING> NULL | A map of various source properties. Corresponds to the source field in the Debezium record. |
The following example shows how to access Debezium metadata fields in Kafka:
CREATE TABLE KafkaTable (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup .mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
Common problem
Posting duplicate change events on failure
In a normal operating environment, Debezium can deliver each change event with exactly-once semantics, and Flink can normally consume the change events generated by Debezium. In abnormal situations (such as failures), Debezium can only guarantee at-least-once delivery semantics. At this time, Debezium may deliver duplicate change events to Kafka, and when Flink consumes from Kafka, it will get duplicate events, which may lead to wrong results or unexpected exceptions in the operation of Flink query. Therefore, in this case, it is recommended to set the job parameter table.exec.source.cdc-events-duplicate to true and define the PRIMARY KEY on the source table. The Flink system will generate an additional stateful operator that uses the PRIMARY KEY to deduplicate change events and generate a normalized changelog stream.
See Debezium for more information on Debezium's message delivery semantics.
Data produced by Debezium Postgres connector cannot be parsed correctly
If you are using Debezium PostgreSQL Connector to capture changes to Kafka, make sure that the REPLICA IDENTITY of the monitored table has been configured as FULL, and the default value is DEFAULT. Otherwise, Flink SQL will not be able to parse Debezium data correctly.
When configured to FULL, update and delete events will fully contain the previous values of all columns. When configured for other, the before field of update and delete events will only contain the value of the PRIMARY KEY field, or be null (no PRIMARY KEY). You can change the configuration of REPLICA IDENTITY by running ALTER TABLE (your-table-name) REPLICA IDENTITY FULL .
This page is derived from the official Apache Flink® documentation.
Refer to the Credits page for more information.