PostgreSQL Connectivity
On this page
PostgreSQL Connectctivity
You can connect your Flink SQL jobs to PostgreSQL using one of three flexible options. Each one is designed for a distinct technical requirement.
Prerequisites
- Access to an active Postgres database instance running at a known network address (host and port).
- The required tables must already exist in the target Postgres database.
- Valid credentials: a username and password with sufficient privileges to read from or write to the tables.
Postgres Connector
Use this connector when you need a bounded snapshot of a PostgreSQL table (source) or want to continuously write streaming results into PostgreSQL (sink). You can also use it for dimension table lookups.
Overview
- Supports source, sink, and dimension (lookup) tables.
- The required Postgres JDBC driver is already bundled. No manual JAR upload is required.
- The syntax is a specific implementation of the generic JDBC connector, using
'connector' = 'postgres'.
Source Table Syntax
1CREATE TABLE orders_src (
2 id BIGINT,
3 order_time TIMESTAMP(3),
4 amount DECIMAL(10,2)
5) WITH (
6 'connector' = 'postgres',
7 'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
8 'table-name' = 'orders',
9 'username' = 'flink',
10 'password' = 'flink',
11 -- optional parallel read
12 'scan.partition.column' = 'id',
13 'scan.partition.num' = '4',
14 'scan.partition.lower-bound' = '1',
15 'scan.partition.upper-bound' = '1000000'
16);Sink (Result) Table Syntax
1CREATE TABLE orders_sink (
2 id BIGINT,
3 order
4 _time TIMESTAMP(3),
5 amount DECIMAL(10,2),
6 PRIMARY KEY (id) NOT ENFORCED
7) WITH (
8 'connector' = 'postgres',
9 'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
10 'table-name' = 'orders
11 _enriched',
12 'username' = 'flink',
13 'password' = 'flink',
14 -- tune buffering for higher throughput
15 'sink.buffer-flush.max-rows' = '500',
16 'sink.buffer-flush.interval' = '2s'
17);Dimension (Lookup) Table Syntax
1CREATE TABLE currency_rates_dim (
2 currency_code STRING,
3 rate DECIMAL(10,4)
4) WITH (
5 'connector' = 'postgres',
6 'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
7 'table-name' = 'currency_rates',
8 'username' = 'flink',
9 'password' = 'flink',
10 -- enable caching for low-latency lookups
11 'lookup.cache.max-rows' = '1000',
12 'lookup.cache.ttl' = '1h'
13);WITH Options
Limitations
- Runtime version support: Only Ververica Runtime VERA 1.0.3 or later bundles the open-source JDBC/Postgres connector.
- Bounded source semantics: A Postgres source table is finite. The task finishes after all rows are read. For continuous streams, use Postgres-CDC.
- PostgreSQL version: PostgreSQL 9.5 or later is required for sinks, as the connector uses
ON CONFLICTfor upserts.
Postgres Catalog
A Postgres catalog registers an entire database, making every schema and table addressable in Flink SQL without requiring individual CREATE TABLE statements.
Overview
- Zero-DDL onboarding: Query existing tables immediately.
- Namespace isolation: Use fully qualified names (
pg.<schema>.<table>) or set context withUSE. - Packaged driver: Ververica bundles the Postgres JDBC driver. No extra JAR is needed.
Limitations
Create Catalog Options
You can create a catalog using SQL or on the console.
- To create with SQL:
1CREATE CATALOG pg_catalog WITH (
2 'type' = 'postgres',
3 'base-url' = 'jdbc:postgresql://pg-host:5432',
4 'default-database' = 'ecommerce',
5 'username' = 'flink',
6 'password' = 'flink'
7);- On the console: Go to Catalogs › Create Catalog, then follow the creation and configuration (hostname, port, deafult database, username, password) procedure.
Parameter Reference
Database Allow/Deny Lists
Control which PostgreSQL databases are visible through a catalog using an allowlist or denylist. This is useful on hosted services (e.g., AWS RDS) that expose system databases.
database.whitelist: Semicolon-separated list of databases to expose (e.g.,'ecommerce;sales_analytics').database.blacklist: Semicolon-separated list of databases to hide (e.g.,'rdsadmin;template0;template1').
Rules
- Specify either a whitelist or a blacklist, not both. If both are present the catalog creation fails with a validation error.
- If neither option is set, the catalog behaves like the default: PostgreSQL template databases (
template0andtemplate1) are excluded from discovery. - Multiple databases are separated by semicolons (e.g., 'db1;db2;db3').
- Matching is by database name. Use lowercase letters unless your database names are quoted identifiers.
Examples
- Whitelist only (expose exactly these databases):
1CREATE CATALOG pg_whitelist WITH (
2 'type' = 'postgres',
3 'base-url' = 'jdbc:postgresql://pg-host:5432',
4 'default-database' = 'ecommerce',
5 'username' = 'flink',
6 'password' = 'flink',
7 'database.whitelist'= 'ecommerce;sales_analytics'
8);- Blacklist only (hide certain databases, show all others):
1CREATE CATALOG pg_blacklist WITH (
2 'type' = 'postgres',
3 'base-url' = 'jdbc:postgresql://pg-host:5432',
4 'default-database' = 'postgres',
5 'username' = 'flink',
6 'password' = 'flink',
7 'database.blacklist' = 'template0;template1'
8);AWS RDS example
AWS RDS exposes a reserved database named rdsadmin that should not be accessed by clients. You can safely run Flink against RDS by blacklisting it (and, for cleanliness, the templates):
1CREATE CATALOG rds_pg WITH (
2 'type' = 'postgres',
3 'base-url' = 'jdbc:postgresql://<rds-endpoint>:5432',
4 'default-database' = 'postgres',
5 'username' = '<user>',
6 'password' = '<password>',
7 'database.blacklist' = 'rdsadmin;template0;template1'
8);If you see an error such as FATAL: database "template0" is not currently accepting connections, make sure template0 is excluded (either through blacklisting it or by not whitelisting it).
- This feature only affects discovery (what shows up in
SHOW DATABASESand what Flink enumerates). It’s not a security boundary. Database-level permissions still apply. - Avoid whitelisting
template0(it can’t accept connections) and service/admin databases likerdsadminon RDS.
Using a Postgres Catalog
1-- Set the current catalog context
2USE CATALOG pg_catalog;
3
4-- Browse catalog objects
5SHOW DATABASES;
6SHOW TABLES FROM public;
7
8-- Run a bounded read from a table
9SELECT * FROM public.orders LIMIT 10;
10
11-- Write a continuous stream to a table
12INSERT INTO public.enriched_orders
13SELECT * FROM kafka_src;
14
15-- Perform a dimension table lookup
16SELECT o.*, r.rate
17FROM kafka_orders o
18JOIN pg_catalog.public.currency_rates FOR SYSTEM_TIME AS OF o.proctime AS r
19ON o.currency = r.currency_code;CTAS / DBAS Helpers
You can also select a catalog for usage, and then use the previously shown Postgres Connector to create tables or databases:
1-- Set the context
2USE CATALOG pg_catalog;
3
4-- Snapshot a single table
5CREATE TABLE public.orders_copy
6WITH ('connector'='postgres')
7AS TABLE pg_catalog.public.orders;
8
9-- Clone the entire DB
10CREATE DATABASE sales_copy
11WITH ('connector'='postgres')
12AS DATABASE pg_catalog.ecommerce INCLUDING ALL TABLES;Manage the Catalog
To view your database metadata, go to the Catalogs section in the Console. From there, select a catalog to browse its databases and tables.
If you want to delete a catalog, use the command DROP CATALOG pg_catalog. You can also delete it using the Console. Deployments will keep running, but they will fail to resolve catalog tables on restart after deletion.
Postgres-CDC Connector
Use this connector to stream row-level change events (INSERT, UPDATE, DELETE)from PostgreSQL with exactly-once delivery semantics.
Overview
- Real-time: Streams an initial snapshot followed by all subsequent changes from the Write-Ahead Log (WAL).
- Exactly-once: Guarantees no data loss or duplication, even across failures.
- Rich metadata: Exposes database, schema, table, and operation timestamp columns.
Syntax
1CREATE TABLE shipments_cdc (
2 shipment_id INT,
3 order_id INT,
4 origin STRING,
5 destination STRING,
6 is_arrived BOOLEAN
7) WITH (
8 'connector' = 'postgres-cdc',
9 'hostname' = 'pg-host',
10 'port' = '5432',
11 'username' = 'flink',
12 'password' = 'flink',
13 'database-name' = 'ecommerce',
14 'schema-name' = 'public',
15 'table-name' = 'shipments',
16 'slot.name' = 'flink',
17 'debezium.plugin.name' = 'pgoutput'
18);Limitations and Operational Tips
- Source-only: Cannot be used as a sink or lookup table.
- Snapshot Duration and Checkpoints: The initial snapshot scans the entire table at once and cannot emit a checkpoint barrier until the scan is complete. Consequently, Flink’s global checkpoint stays pending for the entire snapshot duration. If that duration exceeds execution.checkpointing.timeout, the checkpoint is marked failed. With Flink’s default tolerance, a single failure triggers a job restart. Increase the interval and/or the number of tolerable failed checkpoints so the snapshot can finish before Flink gives up. Tune the following Flink deployment parameters so the job can tolerate a long‑running snapshot without failing:
- Requires logical replication enabled and a suitable decoding plugin (pgoutput, wal2json, etc.).
BYOC Deployment Example Usage
This section demonstrates how to use the Postgres Connector and Catalog in the context of a BYOC deployment. It provides examples for the following tasks:
- Setting up a test Postgres instance with the required tables.
- Creating and inserting data into a Flink SQL table from an existing Postgres table.
- Creating a catalog from a Postgres instance, including how to use allowlists/denylists.
- Using the created catalog to move data between tables.
You will see examples for two common scenarios: a self-hosted Postgres instance (deployed in AWS EC2) and a managed Postgres instance (AWS RDS).
Setting Up a Test Postgres Instance
This section assumes you have already provisioned a Postgres instance and configured its network and security access so that you can connect to it using psql. After you connect to the Postgres instance, you can set up the required test tables with the following commands:
1CREATE TABLE financial_transactions_out (
2 name TEXT,
3 amount INTEGER
4);
5
6CREATE TABLE financial_transactions (
7 name TEXT,
8 amount INTEGER
9);Updating a Table in the SQL Editor
You can now go to the SQL Editor in your Managed Service deployment to create a Flink table from the financial_transactions table and insert new data.
For a self-hosted Postgres instance on an EC2 machine, you can supply the public IP in the base-url parameter:
1CREATE TEMPORARY TABLE FlinkTable1 (name STRING, amount INT)
2WITH (
3 'connector' = 'postgres',
4 'url' = 'jdbc:postgresql://52.59.207.216:5432/postgres',
5 'table-name' = 'financial_transactions',
6 'username' = 'postgres',
7 'password' = '1234'
8);
9
10INSERT INTO FlinkTable1
11VALUES ('ttt',1);For an instance hosted in AWS RDS, you need change the url to use the RDS endpoint:
1CREATE TEMPORARY TABLE FlinkTable1 (name STRING, amount INT)
2WITH (
3 'connector' = 'postgres',
4 'url' = 'jdbc:postgresql://[pg-conn-test.cpozd9x7eku2.eu-central-1.rds.amazonaws.com:5432/postgres](https://pg-conn-test.cpozd9x7eku2.eu-central-1.rds.amazonaws.com:5432/postgres)',
5 'table-name' = 'financial_transactions',
6 'username' = 'postgres',
7 'password' = '1234'
8);
9
10INSERT INTO FlinkTable1 VALUES ('ttt',1);After you deploy and run these jobs from the SQL editor, you can use psql to connect to the Postgres instance and verify that the update was successful:
1postgres=# select * from financial_transactions;
2 name | amount
3------+--------
4 ttt | 1
5(1 row)Creating a Catalog
Next, you can create a catalog to make all tables from the Postgres instance available in the SQL Editor. You can run the following statements for either the self-hosted or RDS-managed instance.
The examples below show how to use the database allowlist/denylist feature for Postgres catalogs. This is useful for excluding unwanted databases (like template0) or including only desired databases to avoid connecting to those not enabled for connections.
To blacklist the default template databases on a self-hosted EC2 instance:
1 'type' = 'postgres',
2 'username' = 'postgres',
3 'password' = '1234',
4 'base-url' = 'jdbc:postgresql://52.59.207.216:5432',
5 'default-database'= 'postgres',
6 'database.blacklist' = 'template0;template1'
7);To whitelist only specific databases on an RDS-managed instance:
1CREATE CATALOG FlinkCatalog1 WITH (
2 'type' = 'postgres',
3 'username' = 'postgres',
4 'password' = '1234',
5 'base-url' = 'jdbc:postgresql://pg-conn-test.cpozd9x7eku2.eu-central-1.rds.amazonaws.com:5432',
6 'default-database' = 'postgres',
7 'database.whitelist' = 'postgres;template1'
8);Using the Created Catalog
After the catalog is successfully created and appears in the Catalogs panel, you can check the available tables and use the catalog to move data between tables.
1use catalog FlinkCatalog1;
2
3INSERT INTO `financial_transactions_out`
4SELECT * FROM `financial_transactions`;After you deploy and run the Flink SQL job, you can verify from the Postgres instance that the data has been successfully moved:
1postgres=# select * from financial_transactions_out;
2 name | amount
3------+--------
4 ttt | 1
5(1 row)