PostgreSQL Connectivity
PostgreSQL Connectctivity
You can connect your Flink SQL jobs to PostgreSQL using one of three flexible options. Each one is designed for a distinct technical requirement.
| Use case example | Recommended option |
|---|---|
| Simple read/write, bounded or unbounded | Postgres connector |
| Explorer‑style access to many tables with minimal DDL | Postgres catalog |
| Stream change events (CDC) | Postgres CDC connector |
Prerequisites
- Access to an active Postgres database instance running at a known network address (host and port).
- The required tables must already exist in the target Postgres database.
- Valid credentials: a username and password with sufficient privileges to read from or write to the tables.
Postgres Connector
Use this connector when you need a bounded snapshot of a PostgreSQL table (source) or want to continuously write streaming results into PostgreSQL (sink). You can also use it for dimension table lookups.
Overview
- Supports source, sink, and dimension (lookup) tables.
- The required Postgres JDBC driver is already bundled. No manual JAR upload is required.
- The syntax is a specific implementation of the generic JDBC connector, using
'connector' = 'postgres'.
Source Table Syntax
CREATE TABLE orders_src (
id BIGINT,
order_time TIMESTAMP(3),
amount DECIMAL(10,2)
) WITH (
'connector' = 'postgres',
'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
'table-name' = 'orders',
'username' = 'flink',
'password' = 'flink',
-- optional parallel read
'scan.partition.column' = 'id',
'scan.partition.num' = '4',
'scan.partition.lower-bound' = '1',
'scan.partition.upper-bound' = '1000000'
);
Sink (Result) Table Syntax
CREATE TABLE orders_sink (
id BIGINT,
order
_time TIMESTAMP(3),
amount DECIMAL(10,2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgres',
'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
'table-name' = 'orders
_enriched',
'username' = 'flink',
'password' = 'flink',
-- tune buffering for higher throughput
'sink.buffer-flush.max-rows' = '500',
'sink.buffer-flush.interval' = '2s'
);
Dimension (Lookup) Table Syntax
CREATE TABLE currency_rates_dim (
currency_code STRING,
rate DECIMAL(10,4)
) WITH (
'connector' = 'postgres',
'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
'table-name' = 'currency_rates',
'username' = 'flink',
'password' = 'flink',
-- enable caching for low-latency lookups
'lookup.cache.max-rows' = '1000',
'lookup.cache.ttl' = '1h'
);
WITH Options
| Parameter | Scope | Required | Default | Purpose / notes |
|---|---|---|---|---|
url | All | ✔︎ | – | Standard PostgreSQL JDBC URL. |
table-name | All | ✔︎ | – | Name of the table or pattern. |
username | All | ✖︎ | – | Username used for authentication. |
password | All | ✖︎ | – | Companion to username. |
scan.partition.column | Source | ✖︎ | – | Column to split data for parallel reads; NUMERIC/DATE/TIMESTAMP only. |
scan.partition.num | Source | ✖︎ | – | Number of partitions created. |
scan.partition.lower-bound | Source | ✖︎ | – | Lower bound value for first partition. |
scan.partition.upper-bound | Source | ✖︎ | – | Upper bound value for last partition. |
scan.fetch-size | Source | ✖︎ | 0 | Rows fetched per read; 0 = driver default. |
scan.auto-commit | Source | ✖︎ | true | Enable/disable auto-commit while reading. |
sink.buffer-flush.max-rows | Sink | ✖︎ | 100 | Records buffered before flush; 0 disables buffering. |
sink.buffer-flush.interval | Sink | ✖︎ | 1s | Max interval between flushes; 0 disables buffering. |
sink.max-retries | Sink | ✖︎ | 3 | Retries if write fails. |
lookup.cache.max-rows | Dimension | ✖︎ | – | Max cache size for dimension rows. |
lookup.cache.ttl | Dimension | ✖︎ | – | Time-to-live of cached rows. |
lookup.cache.caching-missing-key | Dimension | ✖︎ | true | Whether to cache misses (empty results). |
lookup.max-retries | Dimension | ✖︎ | 3 | Retries if lookup fails. |
Limitations
- Runtime version support: Only Ververica Runtime VERA 1.0.3 or later bundles the open-source JDBC/Postgres connector.
- Bounded source semantics: A Postgres source table is finite. The task finishes after all rows are read. For continuous streams, use Postgres-CDC.
- PostgreSQL version: PostgreSQL 9.5 or later is required for sinks, as the
connector uses
ON CONFLICTfor upserts.
Postgres Catalog
A Postgres catalog registers an entire database, making every schema and table addressable in Flink SQL without requiring individual CREATE TABLE statements.
Overview
- Zero-DDL onboarding: Query existing tables immediately.
- Namespace isolation: Use fully qualified names (
pg.<schema>.<table>) or set context withUSE. - Packaged driver: Ververica bundles the Postgres JDBC driver. No extra JAR is needed.
Limitations
| Limitation | Detail | Work-around |
|---|---|---|
| Immutable parameters | After creation, you can't edit the connection properties. Drop & recreate instead. | Plan stable credentials or use environment variables. |
| Supported Postgres versions | Verified with PostgreSQL 9.5+ (required for upsert semantics). | Upgrade DB version if older. |
| DDL propagation | DDL executed in Flink is not forwarded to Postgres; maintain tables in the database. | – |
| CDC streaming | Catalog reads are bounded snapshots. For real-time CDC use the postgres-cdc connector. | – |
Create Catalog Options
You can create a catalog using SQL or on the console.
- To create with SQL:
CREATE CATALOG pg_catalog WITH (
'type' = 'postgres',
'base-url' = 'jdbc:postgresql://pg-host:5432',
'default-database' = 'ecommerce',
'username' = 'flink',
'password' = 'flink'
);
- On the console: Go to Catalogs › Create Catalog, then follow the creation and configuration (hostname, port, deafult database, username, password) procedure.
Parameter Reference
| Parameter | Required | Description |
|---|---|---|
type | ✔︎ | Must be postgres. |
base-url | ✔︎ | JDBC URL including host and port. |
default-database | ✔︎ | Database assumed when none is qualified. |
username, password | ✖︎ | Credentials; omit when trust/SSO applies. |
Database Allow/Deny Lists
Control which PostgreSQL databases are visible through a catalog using an allowlist or denylist. This is useful on hosted services (e.g., AWS RDS) that expose system databases.
database.whitelist: Semicolon-separated list of databases to expose (e.g.,'ecommerce;sales_analytics').database.blacklist: Semicolon-separated list of databases to hide (e.g.,'rdsadmin;template0;template1').
Rules
-
Specify either a whitelist or a blacklist, not both. If both are present the catalog creation fails with a validation error.
-
If neither option is set, the catalog behaves like the default: PostgreSQL template databases (
template0andtemplate1) are excluded from discovery. -
Multiple databases are separated by semicolons (e.g., 'db1;db2;db3').
-
Matching is by database name. Use lowercase letters unless your database names are quoted identifiers.
Examples
- Whitelist only (expose exactly these databases):
CREATE CATALOG pg_whitelist WITH (
'type' = 'postgres',
'base-url' = 'jdbc:postgresql://pg-host:5432',
'default-database' = 'ecommerce',
'username' = 'flink',
'password' = 'flink',
'database.whitelist'= 'ecommerce;sales_analytics'
);
- Blacklist only (hide certain databases, show all others):
CREATE CATALOG pg_blacklist WITH (
'type' = 'postgres',
'base-url' = 'jdbc:postgresql://pg-host:5432',
'default-database' = 'postgres',
'username' = 'flink',
'password' = 'flink',
'database.blacklist' = 'template0;template1'
);
AWS RDS example
AWS RDS exposes a reserved database named rdsadmin that should not be accessed by clients. You can safely run Flink against RDS by blacklisting it (and, for cleanliness, the templates):
CREATE CATALOG rds_pg WITH (
'type' = 'postgres',
'base-url' = 'jdbc:postgresql://<rds-endpoint>:5432',
'default-database' = 'postgres',
'username' = '<user>',
'password' = '<password>',
'database.blacklist' = 'rdsadmin;template0;template1'
);
If you see an error such as FATAL: database "template0" is not currently accepting connections, make sure template0 is excluded (either through blacklisting it or by not whitelisting it).
- This feature only affects discovery (what shows up in
SHOW DATABASESand what Flink enumerates). It’s not a security boundary. Database-level permissions still apply. - Avoid whitelisting
template0(it can’t accept connections) and service/admin databases likerdsadminon RDS.
Using a Postgres Catalog
-- Set the current catalog context
USE CATALOG pg_catalog;
-- Browse catalog objects
SHOW DATABASES;
SHOW TABLES FROM public;
-- Run a bounded read from a table
SELECT * FROM public.orders LIMIT 10;
-- Write a continuous stream to a table
INSERT INTO public.enriched_orders
SELECT * FROM kafka_src;
-- Perform a dimension table lookup
SELECT o.*, r.rate
FROM kafka_orders o
JOIN pg_catalog.public.currency_rates FOR SYSTEM_TIME AS OF o.proctime AS r
ON o.currency = r.currency_code;
CTAS / DBAS Helpers
You can also select a catalog for usage, and then use the previously shown Postgres Connector to create tables or databases:
-- Set the context
USE CATALOG pg_catalog;
-- Snapshot a single table
CREATE TABLE public.orders_copy
WITH ('connector'='postgres')
AS TABLE pg_catalog.public.orders;
-- Clone the entire DB
CREATE DATABASE sales_copy
WITH ('connector'='postgres')
AS DATABASE pg_catalog.ecommerce INCLUDING ALL TABLES;
Manage the Catalog
To view your database metadata, go to the Catalogs section in the Console. From there, select a catalog to browse its databases and tables.
If you want to delete a catalog, use the command DROP CATALOG pg_catalog. You can also delete it using the Console. Deployments will keep running, but they will fail to resolve catalog tables on restart after deletion.
Postgres-CDC Connector
Use this connector to stream row-level change events (INSERT, UPDATE, DELETE)from PostgreSQL with exactly-once delivery semantics.
Overview
- Real-time: Streams an initial snapshot followed by all subsequent changes from the Write-Ahead Log (WAL).
- Exactly-once: Guarantees no data loss or duplication, even across failures.
- Rich metadata: Exposes database, schema, table, and operation timestamp columns.
Syntax
CREATE TABLE shipments_cdc (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'pg-host',
'port' = '5432',
'username' = 'flink',
'password' = 'flink',
'database-name' = 'ecommerce',
'schema-name' = 'public',
'table-name' = 'shipments',
'slot.name' = 'flink',
'debezium.plugin.name' = 'pgoutput'
);
Limitations and Operational Tips
- Source-only: Cannot be used as a sink or lookup table.
- Snapshot Duration and Checkpoints: The initial snapshot scans the entire table at once and cannot emit a checkpoint barrier until the scan is complete. Consequently, Flink’s global checkpoint stays pending for the entire snapshot duration. If that duration exceeds execution.checkpointing.timeout, the checkpoint is marked failed. With Flink’s default tolerance, a single failure triggers a job restart. Increase the interval and/or the number of tolerable failed checkpoints so the snapshot can finish before Flink gives up. Tune the following Flink deployment parameters so the job can tolerate a long‑running snapshot without failing:
| Parameter | Typical starting value | What it controls |
|---|---|---|
execution.checkpointing.interval | 10 min | Time between checkpoint attempts. A longer interval reduces scheduling pressure during snapshots. |
execution.checkpointing.tolerable-failed-checkpoints | 100 | Number of consecutive checkpoint failures allowed before the job fails. Combined with the interval, this yields the maximum tolerated snapshot duration. |
restart-strategy | fixed-delay | The restart policy applied if the job fails. fixed-delay gives a predictable retry cadence. |
restart-strategy.fixed-delay.attempts | 2147483647 | Effectively unlimited retries, ensuring the deployment keeps restarting until the snapshot completes. |
- Requires logical replication enabled and a suitable decoding plugin (pgoutput, wal2json, etc.).
BYOC Deployment Example Usage
This section demonstrates how to use the Postgres Connector and Catalog in the context of a BYOC deployment. It provides examples for the following tasks:
- Setting up a test Postgres instance with the required tables.
- Creating and inserting data into a Flink SQL table from an existing Postgres table.
- Creating a catalog from a Postgres instance, including how to use allowlists/denylists.
- Using the created catalog to move data between tables.
You will see examples for two common scenarios: a self-hosted Postgres instance (deployed in AWS EC2) and a managed Postgres instance (AWS RDS).
Setting Up a Test Postgres Instance
This section assumes you have already provisioned a Postgres instance and configured its network and security access so that you can connect to it using psql. After you connect to the Postgres instance, you can set up the required test tables with the following commands:
CREATE TABLE financial_transactions_out (
name TEXT,
amount INTEGER
);
CREATE TABLE financial_transactions (
name TEXT,
amount INTEGER
);
Updating a Table in the SQL Editor
You can now go to the SQL Editor in your Managed Service deployment to create a Flink table from the financial_transactions table and insert new data.
For a self-hosted Postgres instance on an EC2 machine, you can supply the public IP in the base-url parameter:
CREATE TEMPORARY TABLE FlinkTable1 (name STRING, amount INT)
WITH (
'connector' = 'postgres',
'url' = 'jdbc:postgresql://52.59.207.216:5432/postgres',
'table-name' = 'financial_transactions',
'username' = 'postgres',
'password' = '1234'
);
INSERT INTO FlinkTable1
VALUES ('ttt',1);
For an instance hosted in AWS RDS, you need change the url to use the RDS endpoint:
CREATE TEMPORARY TABLE FlinkTable1 (name STRING, amount INT)
WITH (
'connector' = 'postgres',
'url' = 'jdbc:postgresql://[pg-conn-test.cpozd9x7eku2.eu-central-1.rds.amazonaws.com:5432/postgres](https://pg-conn-test.cpozd9x7eku2.eu-central-1.rds.amazonaws.com:5432/postgres)',
'table-name' = 'financial_transactions',
'username' = 'postgres',
'password' = '1234'
);
INSERT INTO FlinkTable1 VALUES ('ttt',1);
After you deploy and run these jobs from the SQL editor, you can use psql to connect to the Postgres instance and verify that the update was successful:
postgres=# select * from financial_transactions;
name | amount
------+--------
ttt | 1
(1 row)
Creating a Catalog
Next, you can create a catalog to make all tables from the Postgres instance available in the SQL Editor. You can run the following statements for either the self-hosted or RDS-managed instance.
The examples below show how to use the database allowlist/denylist feature for Postgres catalogs. This is useful for excluding unwanted databases (like template0) or including only desired databases to avoid connecting to those not enabled for connections.
To blacklist the default template databases on a self-hosted EC2 instance:
'type' = 'postgres',
'username' = 'postgres',
'password' = '1234',
'base-url' = 'jdbc:postgresql://52.59.207.216:5432',
'default-database'= 'postgres',
'database.blacklist' = 'template0;template1'
);
To whitelist only specific databases on an RDS-managed instance:
CREATE CATALOG FlinkCatalog1 WITH (
'type' = 'postgres',
'username' = 'postgres',
'password' = '1234',
'base-url' = 'jdbc:postgresql://pg-conn-test.cpozd9x7eku2.eu-central-1.rds.amazonaws.com:5432',
'default-database' = 'postgres',
'database.whitelist' = 'postgres;template1'
);
Using the Created Catalog
After the catalog is successfully created and appears in the Catalogs panel, you can check the available tables and use the catalog to move data between tables.
use catalog FlinkCatalog1;
INSERT INTO `financial_transactions_out`
SELECT * FROM `financial_transactions`;
After you deploy and run the Flink SQL job, you can verify from the Postgres instance that the data has been successfully moved:
postgres=# select * from financial_transactions_out;
name | amount
------+--------
ttt | 1
(1 row)