Docs Home
Viewing docs for
BYOCSelf-Managed

PostgreSQL Connectivity

On this page

PostgreSQL Connectctivity

You can connect your Flink SQL jobs to PostgreSQL using one of three flexible options. Each one is designed for a distinct technical requirement.

Use case exampleRecommended option
Simple read/write, bounded or unboundedPostgres connector
Explorer‑style access to many tables with minimal DDLPostgres catalog
Stream change events (CDC)Postgres CDC connector

Prerequisites

  • Access to an active Postgres database instance running at a known network address (host and port).
  • The required tables must already exist in the target Postgres database.
  • Valid credentials: a username and password with sufficient privileges to read from or write to the tables.

Postgres Connector

Use this connector when you need a bounded snapshot of a PostgreSQL table (source) or want to continuously write streaming results into PostgreSQL (sink). You can also use it for dimension table lookups.

Overview

  • Supports source, sink, and dimension (lookup) tables.
  • The required Postgres JDBC driver is already bundled. No manual JAR upload is required.
  • The syntax is a specific implementation of the generic JDBC connector, using 'connector' = 'postgres'.

Source Table Syntax

SQL
1CREATE TABLE orders_src (
2  id BIGINT,
3  order_time TIMESTAMP(3),
4  amount DECIMAL(10,2)
5) WITH (
6  'connector' = 'postgres',
7  'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
8  'table-name' = 'orders',
9  'username' = 'flink',
10  'password' = 'flink',
11  -- optional parallel read
12  'scan.partition.column' = 'id',
13  'scan.partition.num' = '4',
14  'scan.partition.lower-bound' = '1',
15  'scan.partition.upper-bound' = '1000000'
16);

Sink (Result) Table Syntax

SQL
1CREATE TABLE orders_sink (
2  id BIGINT,
3  order
4  _time TIMESTAMP(3),
5  amount DECIMAL(10,2),
6  PRIMARY KEY (id) NOT ENFORCED
7) WITH (
8  'connector' = 'postgres',
9  'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
10  'table-name' = 'orders
11  _enriched',
12  'username' = 'flink',
13  'password' = 'flink',
14  -- tune buffering for higher throughput
15  'sink.buffer-flush.max-rows' = '500',
16  'sink.buffer-flush.interval' = '2s'
17);

Dimension (Lookup) Table Syntax

SQL
1CREATE TABLE currency_rates_dim (
2  currency_code STRING,
3  rate DECIMAL(10,4)
4) WITH (
5  'connector' = 'postgres',
6  'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
7  'table-name' = 'currency_rates',
8  'username' = 'flink',
9  'password' = 'flink',
10  -- enable caching for low-latency lookups
11  'lookup.cache.max-rows' = '1000',
12  'lookup.cache.ttl' = '1h'
13);

WITH Options

ParameterScopeRequiredDefaultPurpose / notes
urlAll✔︎Standard PostgreSQL JDBC URL.
table-nameAll✔︎Name of the table or pattern.
usernameAll✖︎Username used for authentication.
passwordAll✖︎Companion to username.
scan.partition.columnSource✖︎Column to split data for parallel reads; NUMERIC/DATE/TIMESTAMP only.
scan.partition.numSource✖︎Number of partitions created.
scan.partition.lower-boundSource✖︎Lower bound value for first partition.
scan.partition.upper-boundSource✖︎Upper bound value for last partition.
scan.fetch-sizeSource✖︎0Rows fetched per read; 0 = driver default.
scan.auto-commitSource✖︎trueEnable/disable auto-commit while reading.
sink.buffer-flush.max-rowsSink✖︎100Records buffered before flush; 0 disables buffering.
sink.buffer-flush.intervalSink✖︎1sMax interval between flushes; 0 disables buffering.
sink.max-retriesSink✖︎3Retries if write fails.
lookup.cache.max-rowsDimension✖︎Max cache size for dimension rows.
lookup.cache.ttlDimension✖︎Time-to-live of cached rows.
lookup.cache.caching-missing-keyDimension✖︎trueWhether to cache misses (empty results).
lookup.max-retriesDimension✖︎3Retries if lookup fails.

Limitations

  • Runtime version support: Only Ververica Runtime VERA 1.0.3 or later bundles the open-source JDBC/Postgres connector.
  • Bounded source semantics: A Postgres source table is finite. The task finishes after all rows are read. For continuous streams, use Postgres-CDC.
  • PostgreSQL version: PostgreSQL 9.5 or later is required for sinks, as the connector uses ON CONFLICT for upserts.

Postgres Catalog

A Postgres catalog registers an entire database, making every schema and table addressable in Flink SQL without requiring individual CREATE TABLE statements.

Overview

  • Zero-DDL onboarding: Query existing tables immediately.
  • Namespace isolation: Use fully qualified names (pg.<schema>.<table>) or set context with USE.
  • Packaged driver: Ververica bundles the Postgres JDBC driver. No extra JAR is needed.

Limitations

LimitationDetailWork-around
Immutable parametersAfter creation, you can't edit the connection properties. Drop & recreate instead.Plan stable credentials or use environment variables.
Supported Postgres versionsVerified with PostgreSQL 9.5+ (required for upsert semantics).Upgrade DB version if older.
DDL propagationDDL executed in Flink is not forwarded to Postgres; maintain tables in the database.
CDC streamingCatalog reads are bounded snapshots. For real-time CDC use the postgres-cdc connector.

Create Catalog Options

You can create a catalog using SQL or on the console.

  • To create with SQL:
SQL
1CREATE CATALOG pg_catalog WITH (
2  'type' = 'postgres',
3  'base-url' = 'jdbc:postgresql://pg-host:5432',
4  'default-database' = 'ecommerce',
5  'username' = 'flink',
6  'password' = 'flink'
7);
  • On the console: Go to Catalogs › Create Catalog, then follow the creation and configuration (hostname, port, deafult database, username, password) procedure.

Parameter Reference

ParameterRequiredDescription
type✔︎Must be postgres.
base-url✔︎JDBC URL including host and port.
default-database✔︎Database assumed when none is qualified.
username, password✖︎Credentials; omit when trust/SSO applies.

Database Allow/Deny Lists

Control which PostgreSQL databases are visible through a catalog using an allowlist or denylist. This is useful on hosted services (e.g., AWS RDS) that expose system databases.

  • database.whitelist: Semicolon-separated list of databases to expose (e.g., 'ecommerce;sales_analytics').
  • database.blacklist: Semicolon-separated list of databases to hide (e.g., 'rdsadmin;template0;template1').

Rules

  • Specify either a whitelist or a blacklist, not both. If both are present the catalog creation fails with a validation error.
  • If neither option is set, the catalog behaves like the default: PostgreSQL template databases (template0 and template1) are excluded from discovery.
  • Multiple databases are separated by semicolons (e.g., 'db1;db2;db3').
  • Matching is by database name. Use lowercase letters unless your database names are quoted identifiers.

Examples

  • Whitelist only (expose exactly these databases):
SQL
1CREATE CATALOG pg_whitelist WITH (
2  'type'              = 'postgres',
3  'base-url'          = 'jdbc:postgresql://pg-host:5432',
4  'default-database'  = 'ecommerce',
5  'username'          = 'flink',
6  'password'          = 'flink',
7  'database.whitelist'= 'ecommerce;sales_analytics'
8);
  • Blacklist only (hide certain databases, show all others):
SQL
1CREATE CATALOG pg_blacklist WITH (
2  'type'               = 'postgres',
3  'base-url'           = 'jdbc:postgresql://pg-host:5432',
4  'default-database'   = 'postgres',
5  'username'           = 'flink',
6  'password'           = 'flink',
7  'database.blacklist' = 'template0;template1'
8);

AWS RDS example

AWS RDS exposes a reserved database named rdsadmin that should not be accessed by clients. You can safely run Flink against RDS by blacklisting it (and, for cleanliness, the templates):

SQL
1CREATE CATALOG rds_pg WITH (
2  'type'               = 'postgres',
3  'base-url'           = 'jdbc:postgresql://<rds-endpoint>:5432',
4  'default-database'   = 'postgres',
5  'username'           = '<user>',
6  'password'           = '<password>',
7  'database.blacklist' = 'rdsadmin;template0;template1'
8);

Using a Postgres Catalog

SQL
1-- Set the current catalog context
2USE CATALOG pg_catalog;
3
4-- Browse catalog objects
5SHOW DATABASES;
6SHOW TABLES FROM public;
7
8-- Run a bounded read from a table
9SELECT * FROM public.orders LIMIT 10;
10
11-- Write a continuous stream to a table
12INSERT INTO public.enriched_orders
13SELECT * FROM kafka_src;
14
15-- Perform a dimension table lookup
16SELECT o.*, r.rate
17FROM kafka_orders o
18JOIN pg_catalog.public.currency_rates FOR SYSTEM_TIME AS OF o.proctime AS r
19ON o.currency = r.currency_code;

CTAS / DBAS Helpers

You can also select a catalog for usage, and then use the previously shown Postgres Connector to create tables or databases:

SQL
1-- Set the context
2USE CATALOG pg_catalog;
3
4-- Snapshot a single table
5CREATE TABLE public.orders_copy
6WITH ('connector'='postgres')
7AS TABLE pg_catalog.public.orders;
8
9-- Clone the entire DB
10CREATE DATABASE sales_copy
11WITH ('connector'='postgres')
12AS DATABASE pg_catalog.ecommerce INCLUDING ALL TABLES;

Manage the Catalog

To view your database metadata, go to the Catalogs section in the Console. From there, select a catalog to browse its databases and tables.

If you want to delete a catalog, use the command DROP CATALOG pg_catalog. You can also delete it using the Console. Deployments will keep running, but they will fail to resolve catalog tables on restart after deletion.

Postgres-CDC Connector

Use this connector to stream row-level change events (INSERT, UPDATE, DELETE)from PostgreSQL with exactly-once delivery semantics.

Overview

  • Real-time: Streams an initial snapshot followed by all subsequent changes from the Write-Ahead Log (WAL).
  • Exactly-once: Guarantees no data loss or duplication, even across failures.
  • Rich metadata: Exposes database, schema, table, and operation timestamp columns.

Syntax

SQL
1CREATE TABLE shipments_cdc (
2  shipment_id INT,
3  order_id    INT,
4  origin      STRING,
5  destination STRING,
6  is_arrived  BOOLEAN
7) WITH (
8  'connector'        = 'postgres-cdc',
9  'hostname'         = 'pg-host',
10  'port'             = '5432',
11  'username'         = 'flink',
12  'password'         = 'flink',
13  'database-name'    = 'ecommerce',
14  'schema-name'      = 'public',
15  'table-name'       = 'shipments',
16  'slot.name'        = 'flink',
17  'debezium.plugin.name' = 'pgoutput'
18);

Limitations and Operational Tips

  • Source-only: Cannot be used as a sink or lookup table.
  • Snapshot Duration and Checkpoints: The initial snapshot scans the entire table at once and cannot emit a checkpoint barrier until the scan is complete. Consequently, Flink’s global checkpoint stays pending for the entire snapshot duration. If that duration exceeds execution.checkpointing.timeout, the checkpoint is marked failed. With Flink’s default tolerance, a single failure triggers a job restart. Increase the interval and/or the number of tolerable failed checkpoints so the snapshot can finish before Flink gives up. Tune the following Flink deployment parameters so the job can tolerate a long‑running snapshot without failing:
ParameterTypical starting valueWhat it controls
execution.checkpointing.interval10 minTime between checkpoint attempts. A longer interval reduces scheduling pressure during snapshots.
execution.checkpointing.tolerable-failed-checkpoints100Number of consecutive checkpoint failures allowed before the job fails. Combined with the interval, this yields the maximum tolerated snapshot duration.
restart-strategyfixed-delayThe restart policy applied if the job fails. fixed-delay gives a predictable retry cadence.
restart-strategy.fixed-delay.attempts2147483647Effectively unlimited retries, ensuring the deployment keeps restarting until the snapshot completes.
  • Requires logical replication enabled and a suitable decoding plugin (pgoutput, wal2json, etc.).

BYOC Deployment Example Usage

This section demonstrates how to use the Postgres Connector and Catalog in the context of a BYOC deployment. It provides examples for the following tasks:

  • Setting up a test Postgres instance with the required tables.
  • Creating and inserting data into a Flink SQL table from an existing Postgres table.
  • Creating a catalog from a Postgres instance, including how to use allowlists/denylists.
  • Using the created catalog to move data between tables.

You will see examples for two common scenarios: a self-hosted Postgres instance (deployed in AWS EC2) and a managed Postgres instance (AWS RDS).

Setting Up a Test Postgres Instance

This section assumes you have already provisioned a Postgres instance and configured its network and security access so that you can connect to it using psql. After you connect to the Postgres instance, you can set up the required test tables with the following commands:

SQL
1CREATE TABLE financial_transactions_out (
2    name TEXT,
3    amount INTEGER
4);
5
6CREATE TABLE financial_transactions (
7    name TEXT,
8    amount INTEGER
9);

Updating a Table in the SQL Editor

You can now go to the SQL Editor in your Managed Service deployment to create a Flink table from the financial_transactions table and insert new data.

For a self-hosted Postgres instance on an EC2 machine, you can supply the public IP in the base-url parameter:

SQL
1CREATE TEMPORARY TABLE FlinkTable1 (name STRING, amount INT)
2WITH (
3    'connector' = 'postgres',
4    'url' = 'jdbc:postgresql://52.59.207.216:5432/postgres',
5    'table-name' = 'financial_transactions',
6    'username' = 'postgres',
7    'password' = '1234'
8);
9
10INSERT INTO FlinkTable1 
11VALUES ('ttt',1);

For an instance hosted in AWS RDS, you need change the url to use the RDS endpoint:

SQL
1CREATE TEMPORARY TABLE FlinkTable1 (name STRING, amount INT)
2WITH (
3    'connector' = 'postgres',
4    'url' = 'jdbc:postgresql://[pg-conn-test.cpozd9x7eku2.eu-central-1.rds.amazonaws.com:5432/postgres](https://pg-conn-test.cpozd9x7eku2.eu-central-1.rds.amazonaws.com:5432/postgres)',
5    'table-name' = 'financial_transactions',
6    'username' = 'postgres',
7    'password' = '1234'
8);
9
10INSERT INTO FlinkTable1 VALUES ('ttt',1);

After you deploy and run these jobs from the SQL editor, you can use psql to connect to the Postgres instance and verify that the update was successful:

TEXT
1postgres=# select * from financial_transactions;
2 name | amount
3------+--------
4 ttt  |      1
5(1 row)

Creating a Catalog

Next, you can create a catalog to make all tables from the Postgres instance available in the SQL Editor. You can run the following statements for either the self-hosted or RDS-managed instance.

The examples below show how to use the database allowlist/denylist feature for Postgres catalogs. This is useful for excluding unwanted databases (like template0) or including only desired databases to avoid connecting to those not enabled for connections.

To blacklist the default template databases on a self-hosted EC2 instance:

TEXT
1  'type'            = 'postgres',
2  'username'        = 'postgres',
3  'password'        = '1234',
4  'base-url'        = 'jdbc:postgresql://52.59.207.216:5432',
5  'default-database'= 'postgres',
6  'database.blacklist' = 'template0;template1'
7);

To whitelist only specific databases on an RDS-managed instance:

SQL
1CREATE CATALOG FlinkCatalog1 WITH (
2  'type' = 'postgres',
3  'username' = 'postgres',
4  'password' = '1234',
5  'base-url' = 'jdbc:postgresql://pg-conn-test.cpozd9x7eku2.eu-central-1.rds.amazonaws.com:5432',
6  'default-database' = 'postgres',
7  'database.whitelist' = 'postgres;template1'
8);

Using the Created Catalog

After the catalog is successfully created and appears in the Catalogs panel, you can check the available tables and use the catalog to move data between tables.

SQL
1use catalog FlinkCatalog1;
2
3INSERT INTO `financial_transactions_out`
4SELECT * FROM `financial_transactions`;

After you deploy and run the Flink SQL job, you can verify from the Postgres instance that the data has been successfully moved:

TEXT
1postgres=# select * from financial_transactions_out;
2 name | amount
3------+--------
4 ttt  |      1
5(1 row)
Was this helpful?