PostgreSQL Connectivity
PostgreSQL Connectctivity
You can connect your Flink SQL jobs to PostgreSQL using one of three flexible options. Each one is designed for a distinct technical requirement.
| Use case example | Recommended option |
|---|---|
| Simple read/write, bounded or unbounded | Postgres connector |
| Explorer‑style access to many tables with minimal DDL | Postgres catalog |
| Stream change events (CDC) | Postgres CDC connector |
Prerequisites
- Access to an active Postgres database instance running at a known network address (host and port).
- The required tables must already exist in the target Postgres database.
- Valid credentials: a username and password with sufficient privileges to read from or write to the tables.
Postgres Connector
Use this connector when you need a bounded snapshot of a PostgreSQL table (source) or want to continuously write streaming results into PostgreSQL (sink). You can also use it for dimension table lookups.
Overview
- Supports source, sink, and dimension (lookup) tables.
- The required Postgres JDBC driver is already bundled. No manual JAR upload is required.
- The syntax is a specific implementation of the generic JDBC connector, using
'connector' = 'postgres'.
Source Table Syntax
CREATE TABLE orders_src (
id BIGINT,
order_time TIMESTAMP(3),
amount DECIMAL(10,2)
) WITH (
'connector' = 'postgres',
'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
'table-name' = 'orders',
'username' = 'flink',
'password' = 'flink',
-- optional parallel read
'scan.partition.column' = 'id',
'scan.partition.num' = '4',
'scan.partition.lower-bound' = '1',
'scan.partition.upper-bound' = '1000000'
);
Sink (Result) Table Syntax
CREATE TABLE orders_sink (
id BIGINT,
order
_time TIMESTAMP(3),
amount DECIMAL(10,2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgres',
'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
'table-name' = 'orders
_enriched',
'username' = 'flink',
'password' = 'flink',
-- tune buffering for higher throughput
'sink.buffer-flush.max-rows' = '500',
'sink.buffer-flush.interval' = '2s'
);
Dimension (Lookup) Table Syntax
CREATE TABLE currency_rates_dim (
currency_code STRING,
rate DECIMAL(10,4)
) WITH (
'connector' = 'postgres',
'url' = 'jdbc:postgresql://pg-host:5432/ecommerce',
'table-name' = 'currency_rates',
'username' = 'flink',
'password' = 'flink',
-- enable caching for low-latency lookups
'lookup.cache.max-rows' = '1000',
'lookup.cache.ttl' = '1h'
);
WITH Options
| Parameter | Scope | Required | Default | Purpose / notes |
|---|---|---|---|---|
url | All | ✔︎ | – | Standard PostgreSQL JDBC URL. |
table-name | All | ✔︎ | – | Name of the table or pattern. |
username | All | ✖︎ | – | Username used for authentication. |
password | All | ✖︎ | – | Companion to username. |
scan.partition.column | Source | ✖︎ | – | Column to split data for parallel reads; NUMERIC/DATE/TIMESTAMP only. |
scan.partition.num | Source | ✖︎ | – | Number of partitions created. |
scan.partition.lower-bound | Source | ✖︎ | – | Lower bound value for first partition. |
scan.partition.upper-bound | Source | ✖︎ | – | Upper bound value for last partition. |
scan.fetch-size | Source | ✖︎ | 0 | Rows fetched per read; 0 = driver default. |
scan.auto-commit | Source | ✖︎ | true | Enable/disable auto-commit while reading. |
sink.buffer-flush.max-rows | Sink | ✖︎ | 100 | Records buffered before flush; 0 disables buffering. |
sink.buffer-flush.interval | Sink | ✖︎ | 1s | Max interval between flushes; 0 disables buffering. |
sink.max-retries | Sink | ✖︎ | 3 | Retries if write fails. |
lookup.cache.max-rows | Dimension | ✖︎ | – | Max cache size for dimension rows. |
lookup.cache.ttl | Dimension | ✖︎ | – | Time-to-live of cached rows. |
lookup.cache.caching-missing-key | Dimension | ✖︎ | true | Whether to cache misses (empty results). |
lookup.max-retries | Dimension | ✖︎ | 3 | Retries if lookup fails. |
Limitations
- Runtime version support: Only Ververica Runtime VERA 1.0.3 or later bundles the open-source JDBC/Postgres connector.
- Bounded source semantics: A Postgres source table is finite. The task finishes after all rows are read. For continuous streams, use Postgres-CDC.
- PostgreSQL version: PostgreSQL 9.5 or later is required for sinks, as the
connector uses
ON CONFLICTfor upserts.