Skip to main content

Debezium Avro Confluent

This article introduces configuration options, and typemaps of the Debezium-avro-confluent format.

Background information

Debezium is a CDC (Changelog Data Capture) tool that streams changes from MySQL, PostgreSQL, Oracle, Microsoft SQL Server, and many other databases into Kafka in real time. Debezium provides a unified format structure for changelogs and supports serialization of messages using JSON and Apache Avro.

Flink supports parsing Debezium Avro messages into INSERT, UPDATE or DELETE messages into the Flink SQL system. In many cases, it is very useful to take advantage of this feature, for example:

  • Synchronize incremental data from database to other systems
  • Log audit
  • Real-time materialized view of the database
  • Temporal join change history of database tables

Flink also supports encoding INSERT, UPDATE or DELETE messages in Flink SQL into or Avro messages in Debezium format, and output them to storage such as Kafka.

note

Currently Flink does not support combining UPDATE_BEFORE and UPDATE_AFTER into one UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER as Debezium messages of type DELETE and INSERT respectively.

Configuration options

Flink provides debezium-avro-confluent to parse messages in Avro format generated by Debezium.

debezium-avro-confluent

Use debezium-avro-confluent to parse Debezium Avro messages.

ParameterRequiredDefaultTypeDescription
formatyes(none)StringSpecifies the format to be used. When parsing Debezium Avro messages, the parameter value is debezium-avro-confluent.
debezium-avro-confluent.basic-auth.credentials-sourceno(none)StringA source of basic authentication credentials for Schema Registry.
debezium-avro-confluent.basic-auth.user-infono(none)StringBasic authentication user information for Schema Registry.
debezium-avro-confluent.bearer-auth.credentials-sourceno(none)StringThe source of the Schema Registry’s bearer authentication credentials.
debezium-avro-confluent.bearer-auth.tokenno(none)StringThe bearer authentication token for Schema Registry.
debezium-avro-confluent.propertiesno(none)mapAttribute mapping, which is forwarded to Schema Registry. Useful for options that are not officially exposed via Flink configuration options. Important! Flink options have higher priority.
debezium-avro-confluent.ssl.keystore.locationno(none)StringThe location of the SSL keystore.
debezium-avro-confluent.ssl.keystore.passwordno(none)StringPassword for the SSL keystore.
debezium-avro-confluent.ssl.truststore.locationno(none)StringThe location of the SSL truststore.
debezium-avro-confluent.ssl.truststore.passwordno(none)StringPassword for the SSL truststore.
debezium-avro-confluent.subjectno(none)StringThe Confluent Schema Registry subject under which to register the schemas used by this format during serialization. By default, Topname-value or topname-key is used as the default subject name if using the kafka and upsert-kafka connectors as the value or key format. For the filesystem connector, when it is used as a sink, the subject option must be used.
debezium-avro-confluent.urlyes(none)StringURL to the Confluent Schema Registry for fetching or registering schemas.

debezium-json

Use debezium-json to parse Debezium JSON messages.

ParameterRequiredDefaultTypeDescription
formatyes(none)StringSpecifies the format to be used. When parsing Debezium JSON messages, the parameter value is debezium-json.
debezium-json.schema-includenofalseBooleanWhen setting up Debezium Kafka Connect, you can enable the Kafka configuration value.converter.schemas.enable to include schemas in messages. This option indicates whether Debezium JSON messages contain schema. The reference values are as follows: true: Debezium JSON messages contain schema. false: Debezium JSON message does not contain schema.
debezium-json.ignore-parse-errorsnofalseBooleanThe parameter values are as follows: true: When parsing exceptions, skip the current field or row. false (default): An error is reported and the job failed to start.
debezium-json.timestamp-format.standardnoSQLStringSpecifies the input and output timestamp format. The parameter values are as follows: SQL: Parse input timestamps in yyyy-MM-dd HH:mm:ss.s{precision} format, such as 2020-12-30 12:13:14.123, and output timestamps in the same format. ISO-8601: Parse input timestamps in yyyy-MM-ddTHH:mm:ss.s{precision} format, such as 2020-12-30T12:13:14.123, and output timestamps in the same format.
debezium-json.map-null-key.modenoFAILStringSpecifies the method to handle empty key values in the Map. The parameter values are as follows: FAIL: An exception is thrown when the key value in the Map is empty. DROP: Discard the data item whose key value is empty in the Map. LITERAL: Use string constants to replace empty key values in the Map. The value of the string constant is defined by canal-json.map-null-key.literal.
debezium-json.map-null-key.literalnonullStringWhen the value of debezium-json.map-null-key.mode is LITERAL, specify a string constant to replace the null key value in the Map.
debezium-json.encode.decimal-as-plain-numbernofalseBooleanThe parameter values are as follows: true: All data of DECIMAL type remain as they are, and are not expressed in scientific notation, for example: 0.000000027 is expressed as 0.000000027. false: All data of DECIMAL type are expressed in scientific notation, for example, 0.000000027 is expressed as 2.7E-8.

Other Instructions

Available metadata

The following format metadata can be declared as read-only (VIRTUAL) columns in DDL statements.

note

The format metadata field is only available if the corresponding connector forwards format metadata. Currently, only Kafka Connectors are able to declare metadata fields in their value format.

KeyTypeDescription
schemaSTRING NULLA JSON string describing the payload schema. Null if the schema is not contained in the Debezium record.
ingestion-timestampTIMESTAMP_LTZ(3) NULLThe timestamp when the event was processed by the connector. Corresponds to the ts_ms field in the Debezium record.
source.timestampTIMESTAMP_LTZ(3) NULLTimestamp when the source system created the event. Corresponds to the source.ts_ts field in the Debezium record.
source. databaseSTRING NULLoriginal database. Corresponds to the source.db field in the Debezium record (if available).
source.schemaSTRING NULLThe schema of the original database. Corresponds to the source.schema field in the Debezium record (if available).
source.tableSTRING NULLThe tables of the original database. Corresponds to the source.table or source.collection field (if available) in the Debezium record.
source.propertiesMAP <STRING, STRING> NULLA map of various source properties. Corresponds to the source field in the Debezium record.

The following example shows how to access Debezium metadata fields in Kafka:

    CREATE TABLE KafkaTable (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup .mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);

Common problem

Posting duplicate change events on failure

In a normal operating environment, Debezium can deliver each change event with exactly-once semantics, and Flink can normally consume the change events generated by Debezium. In abnormal situations (such as failures), Debezium can only guarantee at-least-once delivery semantics. At this time, Debezium may deliver duplicate change events to Kafka, and when Flink consumes from Kafka, it will get duplicate events, which may lead to wrong results or unexpected exceptions in the operation of Flink query. Therefore, in this case, it is recommended to set the job parameter table.exec.source.cdc-events-duplicate to true and define the PRIMARY KEY on the source table. The Flink system will generate an additional stateful operator that uses the PRIMARY KEY to deduplicate change events and generate a normalized changelog stream.

note

See Debezium for more information on Debezium's message delivery semantics.

Data produced by Debezium Postgres connector cannot be parsed correctly

If you are using Debezium PostgreSQL Connector to capture changes to Kafka, make sure that the REPLICA IDENTITY of the monitored table has been configured as FULL, and the default value is DEFAULT. Otherwise, Flink SQL will not be able to parse Debezium data correctly.

When configured to FULL, update and delete events will fully contain the previous values of all columns. When configured for other, the before field of update and delete events will only contain the value of the PRIMARY KEY field, or be null (no PRIMARY KEY). You can change the configuration of REPLICA IDENTITY by running ALTER TABLE (your-table-name) REPLICA IDENTITY FULL .

note

This page is derived from the official Apache Flink® documentation.

Refer to the Credits page for more information.