Skip to main content

Postgres CDC

Background Information

Postgres CDC can be used to read the full snapshot data and changed data of the PostgreSQL database in sequence, ensuring that neither more nor less data is read. Even if a failure occurs, it can be handled in an Exactly Once manner.

CategoryDetails
Operating modeOnly stream mode is supported
Monitoring indicatorsSource: currentFetchEventTimeLag: The interval between data generation and fetching to the Source Operator. (The indicator is only valid in the binlog phase, and the value is always 0 in the snapshot phase). currentEmitEventTimeLag: The interval between when data is generated and when it leaves the Source Operator. (The indicator is only valid in the binlog phase, and the value is always 0 in the snapshot phase). sourceIdleTime: How long has the source not generated new data so far

Prerequisite

  • The Postgres database and table have been created.

  • Corresponding configurations have been made on Amazon RDS PostgreSQL, or self-built PostgreSQL.

Usage Restrictions

  • Only supported as source table, not supported as result table and dimension table
  • When Postgres CDC scans the full table data, it reads it all at once, and there is no point available for recovery, so it cannot perform Checkpoint during the full table scan phase. If the checkpoint is not executed, the source table of Postgres CDC will keep the checkpoint in execution waiting until the checkpoint times out (if the table is very large, the scan will take a very long time). A checkpoint that times out will be considered a failed checkpoint. In Flink’s default configuration, a failed Checkpoint will trigger Flink job Failover.

More Flink configurations in the advanced configuration of the job development page (see the table below for the meaning of each parameter).

    execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647
ParameterDescription
execution.checkpointing.intervalCheckpoint interval. The unit is a Duration type, such as 10 min or 30s.
execution.checkpointing.tolerable-failed-checkpointsThe number of times to tolerate Checkpoint failures. The product of the value of this parameter and the checkpoint scheduling interval is the allowed snapshot reading time. If the table is particularly large, this can be configured larger.
restart-strategyRestart policy, the parameter values are as follows: fixed-delay: fixed delay restart strategy. failure-rate: failure rate restart strategy. exponential-delay: Exponential delay restart strategy.
restart-strategy.fixed-delay.attemptsUnder the fixed-delay restart policy, the maximum number of restart attempts.

Grammatical Structures

    CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
)

WITH Parameter

ParameterDescriptionData typeRequiredDefault valueRemarks
connectorThe type of the table.STRINGYesNo default valueSet the value to postgres-cdc.
hostnameThe IP address or hostname of the PostgreSQL database.STRINGYesNo default valueN/A.
usernameThe username that is used to access the PostgreSQL database service.STRINGYesSTRINGN/A.
passwordThe password that is used to access the PostgreSQL database service.STRINGYesNo default valueN/A.
database-nameThe name of the database.STRINGYesNo default valueIf you want to read data from multiple databases, you can set this parameter to a regular expression.
schema-nameThe schema name of the PostgreSQL database.STRINGYesNo default valueIf you want to read data from multiple schemas, you can set this parameter to a regular expression.
table-nameThe name of the PostgreSQL table.STRINGYesNo default valueIf you want to read data from multiple tables, you can set this parameter to a regular expression.
portThe port that is used to access the PostgreSQL database service.INTEGERNo5432N/A.
decoding.plugin.nameThe name of the PostgreSQL logical decoding plug-in.STRINGNodecoderbufsThe plug-in name is determined based on the plug-in that is installed in the PostgreSQL database service. Valid values: decoderbufs (default value), wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming, pgoutput.
debezium.*The Debezium property parameters.STRINGNoNo default valueFine-grained control over the behavior of the Debezium client. For example ‘debezium.snapshot.mode’ = ’never’. For more information, see Configure properties.
slot.nameThe name of the logical decoding slot.STRINGYesNo default value

Type Mapping

The correspondence between Postgres CDC and Flink field types is as follows.

Postgres CDC field typeFlink field type
SMALLINTSMALLINT
INT2
SMALLSERIAL
SERIAL2
INTEGERINT
SERIAL
BIGINTBIGINT
BIGSERIAL
BIGINTBIGINT
REALFLOAT
FLOAT4
FLOAT8DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)DECIMAL(p, s)
DECIMAL(p, s)
BOOLEANBOOLEAN
DATEDATE
TIME [(p)] [WITHOUT TIMEZONE]TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n)STRING
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
BYTEABYTES

Example

CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>',
'slot.name' = 'flink',
'debezium.plugin.name' = 'pgoutput'
)
note

This page is derived from the official Apache Flink® documentation.

Refer to the Credits page for more information.