PostgreSQL Connectivity
On this page
PostgreSQL Connectctivity
You can connect your Flink SQL jobs to PostgreSQL using one of three flexible options. Each one is designed for a distinct technical requirement.
Prerequisites
- Access to an active Postgres database instance running at a known network address (host and port).
- The required tables must already exist in the target Postgres database.
- Valid credentials: a username and password with sufficient privileges to read from or write to the tables.
Postgres Connector
Use this connector when you need a bounded snapshot of a PostgreSQL table (source) or want to continuously write streaming results into PostgreSQL (sink). You can also use it for dimension table lookups.
Overview
- Supports source, sink, and dimension (lookup) tables.
- The required Postgres JDBC driver is already bundled. No manual JAR upload is required.
- The syntax is a specific implementation of the generic JDBC connector, using
'connector' = 'postgres'.
Source Table Syntax
1CREATE TABLE orders_src (
2 id BIGINT,
3 order_time TIMESTAMP(3),
4 amount DECIMAL(10,2)
5) WITH (
6 'connector' = 'postgres',
7 'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
8 'table-name' = 'orders',
9 'username' = 'flink',
10 'password' = 'flink',
11 -- optional parallel read
12 'scan.partition.column' = 'id',
13 'scan.partition.num' = '4',
14 'scan.partition.lower-bound' = '1',
15 'scan.partition.upper-bound' = '1000000'
16);Sink (Result) Table Syntax
1CREATE TABLE orders_sink (
2 id BIGINT,
3 order
4 _time TIMESTAMP(3),
5 amount DECIMAL(10,2),
6 PRIMARY KEY (id) NOT ENFORCED
7) WITH (
8 'connector' = 'postgres',
9 'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
10 'table-name' = 'orders
11 _enriched',
12 'username' = 'flink',
13 'password' = 'flink',
14 -- tune buffering for higher throughput
15 'sink.buffer-flush.max-rows' = '500',
16 'sink.buffer-flush.interval' = '2s'
17);Dimension (Lookup) Table Syntax
1CREATE TABLE currency_rates_dim (
2 currency_code STRING,
3 rate DECIMAL(10,4)
4) WITH (
5 'connector' = 'postgres',
6 'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
7 'table-name' = 'currency_rates',
8 'username' = 'flink',
9 'password' = 'flink',
10 -- enable caching for low-latency lookups
11 'lookup.cache.max-rows' = '1000',
12 'lookup.cache.ttl' = '1h'
13);WITH Options
Limitations
- Runtime version support: Only Ververica Runtime VERA 1.0.3 or later bundles the open-source JDBC/Postgres connector.
- Bounded source semantics: A Postgres source table is finite. The task finishes after all rows are read. For continuous streams, use Postgres-CDC.
- PostgreSQL version: PostgreSQL 9.5 or later is required for sinks, as the connector uses
ON CONFLICTfor upserts.
Postgres Catalog
A Postgres catalog registers an entire database, making every schema and table addressable in Flink SQL without requiring individual CREATE TABLE statements.
Overview
- Zero-DDL onboarding: Query existing tables immediately.
- Namespace isolation: Use fully qualified names (
pg.<schema>.<table>) or set context withUSE. - Packaged driver: Ververica bundles the Postgres JDBC driver. No extra JAR is needed.
Limitations
Create Catalog Options
You can create a catalog using SQL or on the console.
- To create with SQL:
1CREATE CATALOG pg_catalog WITH (
2 'type' = 'postgres',
3 'base-url' = 'jdbc:postgresql://pg-host:5432',
4 'default-database' = 'ecommerce',
5 'username' = 'flink',
6 'password' = 'flink'
7);- On the console: Go to Catalogs › Create Catalog, then follow the creation and configuration (hostname, port, deafult database, username, password) procedure.
Parameter Reference
Database Allow/Deny Lists
Control which PostgreSQL databases are visible through a catalog using an allowlist or denylist. This is useful on hosted services (e.g., AWS RDS) that expose system databases.
database.whitelist: Semicolon-separated list of databases to expose (e.g.,'ecommerce;sales_analytics').database.blacklist: Semicolon-separated list of databases to hide (e.g.,'rdsadmin;template0;template1').
Rules
- Specify either a whitelist or a blacklist, not both. If both are present the catalog creation fails with a validation error.
- If neither option is set, the catalog behaves like the default: PostgreSQL template databases (
template0andtemplate1) are excluded from discovery. - Multiple databases are separated by semicolons (e.g., 'db1;db2;db3').
- Matching is by database name. Use lowercase letters unless your database names are quoted identifiers.
Examples
- Whitelist only (expose exactly these databases):
1CREATE CATALOG pg_whitelist WITH (
2 'type' = 'postgres',
3 'base-url' = 'jdbc:postgresql://pg-host:5432',
4 'default-database' = 'ecommerce',
5 'username' = 'flink',
6 'password' = 'flink',
7 'database.whitelist'= 'ecommerce;sales_analytics'
8);- Blacklist only (hide certain databases, show all others):
1CREATE CATALOG pg_blacklist WITH (
2 'type' = 'postgres',
3 'base-url' = 'jdbc:postgresql://pg-host:5432',
4 'default-database' = 'postgres',
5 'username' = 'flink',
6 'password' = 'flink',
7 'database.blacklist' = 'template0;template1'
8);AWS RDS example
AWS RDS exposes a reserved database named rdsadmin that should not be accessed by clients. You can safely run Flink against RDS by blacklisting it (and, for cleanliness, the templates):
1CREATE CATALOG rds_pg WITH (
2 'type' = 'postgres',
3 'base-url' = 'jdbc:postgresql://<rds-endpoint>:5432',
4 'default-database' = 'postgres',
5 'username' = '<user>',
6 'password' = '<password>',
7 'database.blacklist' = 'rdsadmin;template0;template1'
8);If you see an error such as FATAL: database "template0" is not currently accepting connections, make sure template0 is excluded (either through blacklisting it or by not whitelisting it).
- This feature only affects discovery (what shows up in
SHOW DATABASESand what Flink enumerates). It’s not a security boundary. Database-level permissions still apply. - Avoid whitelisting
template0(it can’t accept connections) and service/admin databases likerdsadminon RDS.
Using a Postgres Catalog
1-- Set the current catalog context
2USE CATALOG pg_catalog;
3
4-- Browse catalog objects
5SHOW DATABASES;
6SHOW TABLES FROM public;
7
8-- Run a bounded read from a table
9SELECT * FROM public.orders LIMIT 10;
10
11-- Write a continuous stream to a table
12INSERT INTO public.enriched_orders
13SELECT * FROM kafka_src;
14
15-- Perform a dimension table lookup
16SELECT o.*, r.rate
17FROM kafka_orders o
18JOIN pg_catalog.public.currency_rates FOR SYSTEM_TIME AS OF o.proctime AS r
19ON o.currency = r.currency_code;CTAS / DBAS Helpers
You can also select a catalog for usage, and then use the previously shown Postgres Connector to create tables or databases:
1-- Set the context
2USE CATALOG pg_catalog;
3
4-- Snapshot a single table
5CREATE TABLE public.orders_copy
6WITH ('connector'='postgres')
7AS TABLE pg_catalog.public.orders;
8
9-- Clone the entire DB
10CREATE DATABASE sales_copy
11WITH ('connector'='postgres')
12AS DATABASE pg_catalog.ecommerce INCLUDING ALL TABLES;Manage the Catalog
To view your database metadata, go to the Catalogs section in the Console. From there, select a catalog to browse its databases and tables.
If you want to delete a catalog, use the command DROP CATALOG pg_catalog. You can also delete it using the Console. Deployments will keep running, but they will fail to resolve catalog tables on restart after deletion.
Postgres-CDC Connector
Use this connector to stream row-level change events (INSERT, UPDATE, DELETE)from PostgreSQL with exactly-once delivery semantics.
Overview
- Real-time: Streams an initial snapshot followed by all subsequent changes from the Write-Ahead Log (WAL).
- Exactly-once: Guarantees no data loss or duplication, even across failures.
- Rich metadata: Exposes database, schema, table, and operation timestamp columns.
Syntax
1CREATE TABLE shipments_cdc (
2 shipment_id INT,
3 order_id INT,
4 origin STRING,
5 destination STRING,
6 is_arrived BOOLEAN
7) WITH (
8 'connector' = 'postgres-cdc',
9 'hostname' = 'pg-host',
10 'port' = '5432',
11 'username' = 'flink',
12 'password' = 'flink',
13 'database-name' = 'ecommerce',
14 'schema-name' = 'public',
15 'table-name' = 'shipments',
16 'slot.name' = 'flink',
17 'debezium.plugin.name' = 'pgoutput'
18);Limitations and Operational Tips
- Source-only: Cannot be used as a sink or lookup table.
- Snapshot Duration and Checkpoints: The initial snapshot scans the entire table at once and cannot emit a checkpoint barrier until the scan is complete. Consequently, Flink’s global checkpoint stays pending for the entire snapshot duration. If that duration exceeds execution.checkpointing.timeout, the checkpoint is marked failed. With Flink’s default tolerance, a single failure triggers a job restart. Increase the interval and/or the number of tolerable failed checkpoints so the snapshot can finish before Flink gives up. Tune the following Flink deployment parameters so the job can tolerate a long‑running snapshot without failing:
- Requires logical replication enabled and a suitable decoding plugin (pgoutput, wal2json, etc.).