Postgres CDC
Background Information
Postgres CDC can be used to read the full snapshot data and changed data of the PostgreSQL database in sequence, ensuring that neither more nor less data is read. Even if a failure occurs, it can be handled in an Exactly Once manner.
Category | Details |
---|---|
Operating mode | Only stream mode is supported |
Monitoring indicators | Source: currentFetchEventTimeLag: The interval between data generation and fetching to the Source Operator. (The indicator is only valid in the binlog phase, and the value is always 0 in the snapshot phase). currentEmitEventTimeLag: The interval between when data is generated and when it leaves the Source Operator. (The indicator is only valid in the binlog phase, and the value is always 0 in the snapshot phase). sourceIdleTime: How long has the source not generated new data so far |
Prerequisite
-
The Postgres database and table have been created.
-
Corresponding configurations have been made on Amazon RDS PostgreSQL, or self-built PostgreSQL.
Usage Restrictions
- Only supported as source table, not supported as result table and dimension table
- When Postgres CDC scans the full table data, it reads it all at once, and there is no point available for recovery, so it cannot perform Checkpoint during the full table scan phase. If the checkpoint is not executed, the source table of Postgres CDC will keep the checkpoint in execution waiting until the checkpoint times out (if the table is very large, the scan will take a very long time). A checkpoint that times out will be considered a failed checkpoint. In Flink’s default configuration, a failed Checkpoint will trigger Flink job Failover.
More Flink configurations in the advanced configuration of the job development page (see the table below for the meaning of each parameter).
execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647
Parameter | Description |
---|---|
execution.checkpointing.interval | Checkpoint interval. The unit is a Duration type, such as 10 min or 30s. |
execution.checkpointing.tolerable-failed-checkpoints | The number of times to tolerate Checkpoint failures. The product of the value of this parameter and the checkpoint scheduling interval is the allowed snapshot reading time. If the table is particularly large, this can be configured larger. |
restart-strategy | Restart policy, the parameter values are as follows: fixed-delay: fixed delay restart strategy. failure-rate: failure rate restart strategy. exponential-delay: Exponential delay restart strategy. |
restart-strategy.fixed-delay.attempts | Under the fixed-delay restart policy, the maximum number of restart attempts. |
Grammatical Structures
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
)
WITH Parameter
Parameter | Description | Data type | Required | Default value | Remarks |
---|---|---|---|---|---|
connector | The type of the table. | STRING | Yes | No default value | Set the value to postgres-cdc . |
hostname | The IP address or hostname of the PostgreSQL database. | STRING | Yes | No default value | N/A. |
username | The username that is used to access the PostgreSQL database service. | STRING | Yes | STRING | N/A. |
password | The password that is used to access the PostgreSQL database service. | STRING | Yes | No default value | N/A. |
database-name | The name of the database. | STRING | Yes | No default value | If you want to read data from multiple databases, you can set this parameter to a regular expression. |
schema-name | The schema name of the PostgreSQL database. | STRING | Yes | No default value | If you want to read data from multiple schemas, you can set this parameter to a regular expression. |
table-name | The name of the PostgreSQL table. | STRING | Yes | No default value | If you want to read data from multiple tables, you can set this parameter to a regular expression. |
port | The port that is used to access the PostgreSQL database service. | INTEGER | No | 5432 | N/A. |
decoding.plugin.name | The name of the PostgreSQL logical decoding plug-in. | STRING | No | decoderbufs | The plug-in name is determined based on the plug-in that is installed in the PostgreSQL database service. Valid values: decoderbufs (default value), wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming, pgoutput. |
debezium.* | The Debezium property parameters. | STRING | No | No default value | Fine-grained control over the behavior of the Debezium client. For example ‘debezium.snapshot.mode’ = ’never’ . For more information, see Configure properties. |
slot.name | The name of the logical decoding slot. | STRING | Yes | No default value |
Type Mapping
The correspondence between Postgres CDC and Flink field types is as follows.
Postgres CDC field type | Flink field type |
---|---|
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
BIGINT | BIGINT |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
Example
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>',
'slot.name' = 'flink',
'debezium.plugin.name' = 'pgoutput'
)
note
This page is derived from the official Apache Flink® documentation.
Refer to the Credits page for more information.