Skip to main content

PostgreSQL/JDBC

This topic describes how to use the Java Database Connectivity (JDBC) connector.

Background information​

The JDBC connector is provided by Apache Flink and can be used to read data from and write data to common databases, such as MySQL, PostgreSQL, and Oracle. The following table describes the capabilities supported by the JDBC connector.

ItemDescription
Table typeSource table, dimension table, and result table
Running modeStreaming mode and batch mode
Data formatN/A
MetricN/A
API typeSQL API
Data update or deletion in a result tableSupported

Prerequisites​

The database and table to which you want to connect are created.

Limits​

  • Only Realtime Compute for Apache Flink that uses VERA 1.0.3 or later supports the JDBC connector.
  • A JDBC source table is a bounded source. After the JDBC source connector reads all data from a table in an upstream database and writes the data to a source table, the task for the JDBC source table is complete. If you want to capture real-time change data, use a Change Data Capture (CDC) connector. For more information, see Create a MySQL CDC source table and Create a PostgreSQL CDC source table.
  • If you use the JDBC sink connector to connect to a PostgreSQL database, make sure that the database version is PostgreSQL 9.5 or later. PostgreSQL uses the ON CONFLICT syntax to insert or update data when a primary key is specified in a DDL statement. The ON CONFLICT syntax is supported only in PostgreSQL 9.5 or later.
  • Fully managed Flink supports only the open source JDBC connector that does not include a JDBC driver for a specific database.
    note

    When you use the JDBC connector, you must manually upload the JAR package of the driver of the destination database as a dependency file. For more information, see SQL Editor window. The following table describes the JDBC drivers supported by fully managed Flink.

DriverGroup IDArtifact ID
MySQLmysqlmysql-connector-java
Oraclecom.oracle.database.jdbcojdbc8
PostgreSQLorg.postgresqlpostgresql
note

If you use a JDBC driver that is not included in the table, you must test the validity and availability of the JDBC driver.

Syntax​

 CREATE TABLE jdbc_table (
`id` BIGINT,
`name` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);

Parameters in the WITH clause​

Common parameters​

ParameterDescriptionData TypeRequiredDefault ValueRemarks
connectorThe type of the table.STRINGYesNo default valueSet the value to jdbc.
urlThe URL of the database.STRINGYesNo default valueN/A.
table-nameThe name of the JDBC table.STRINGYesNo default valueN/A.
usernameThe name of the JDBC user.STRINGNoNo default valueIf you configure one of the username and password parameters, you must also configure the other parameter.
passwordThe password of the JDBC user.STRINGNoNo default value

Parameters only for source tables​

ParameterDescriptionData TypeRequiredDefault ValueRemarks
scan.partition.columnThe name of the column that is used to partition the input data.STRINGNoNo default valueThe values in the column must be of the NUMERIC, DATE, or TIMESTAMP type. For more information about partitioned scan, see Partitioned Scan.
scan.partition.numThe number of partitions.INTEGERNoNo default valueN/A.
scan.partition.lower-boundThe smallest value of the first partition.INTEGERNoNo default valueN/A.
scan.partition.upper-boundThe largest value of the last partition.INTEGERNoNo default valueN/A.
scan.fetch-sizeThe number of rows of data that are obtained from the database each time data is read from a source table.INTEGERNo0If you set this parameter to 0, this parameter is ignored.
scan.auto-commitSpecifies whether to enable auto-commit.BOOLEANNotrueN/A.

Parameters only for result tables​

ParameterDescriptionData TypeRequiredDefault ValueRemarks
sink.buffer-flush.max-rowsThe maximum number of data records that can be cached before the flush operation is performed.INTEGERNo100If you set this parameter to 0, data records are not cached before the flush operation is performed.
sink.buffer-flush.intervalThe interval at which data is flushed. If data records are cached for a period that exceeds the duration specified by this parameter, the flush operation is performed in an asynchronous thread.DURATIONNo1sIf you set this parameter to 0, data records are not cached before the flush operation is performed. See note below.
sink.max-retriesThe maximum number of retries that are allowed when data fails to be written to the database.INTEGERNo3N/A.
note

If you want to process cached flush events in asynchronous mode, you can set the sink.buffer-flush.max-rows parameter to 0 and configure the sink.buffer-flush.interval parameter based on your business requirements.

Parameters only for dimension tables​

ParameterDescriptionData TypeRequiredDefault ValueRemarks
lookup.cache.max-rowsThe maximum number of rows of data that can be cached. If the number of rows of data in the cache exceeds the value of this parameter, the earliest row of data expires and is replaced by a new row of data.INTEGERNoNo default valueBy default, caching for dimension tables is disabled. You can configure the lookup.cache.max-rows and lookup.cache.ttl parameters to enable caching for dimension tables. If caching for dimension tables is enabled, the LRU cache policy is used.
lookup.cache.ttlThe maximum time to live (TTL) of each row of data in the cache. If the time period for which a row of data is cached exceeds the value of this parameter, the row of data expires.DURATIONNoNo default value
lookup.cache.caching-missing-keySpecifies whether to cache empty query results.BOOLEANNotrueValid values: true - Empty query results are cached. This is the default value. false - Empty query results are not cached.
lookup.max-retriesThe maximum number of retries when the database fails to be queried.INTEGERNo3N/A.

Data type mappings​

Data type of MySQLData type of OracleData type of PostgreSQLData type of Flink SQL
TINYINTN/AN/ATINYINT
  • SMALLINT 1
  • TINYINT UNSIGNED
N/A
  • SMALLINT 1
  • INT2
  • SMALLSERIAL
  • SERIAL2
SMALLINT
  • INT
  • MEDIUMINT
  • SMALLINT UNSIGNED
N/A
  • INTEGER
  • SERIAL
INT
  • BIGINT
  • INT UNSIGNED
N/A
  • BIGINT
  • BIGSERIAL
BIGINT
BIGINT UNSIGNEDN/AN/ADECIMAL(20, 0)
BIGINTN/ABIGINTBIGINT
FLOATBINARY_FLOAT
  • REAL
  • FLOAT4
FLOAT
  • DOUBLE
  • DOUBLE PRECISION
BINARY_DOUBLE
  • FLOAT8
  • DOUBLE PRECISION
DOUBLE
  • NUMERIC(p, s)
  • DECIMAL(p, s)
  • SMALLINT
  • FLOAT(s)
  • DOUBLE PRECISION
  • REAL
  • NUMBER(p, s)
  • NUMERIC(p, s)
  • DECIMAL(p, s)
DECIMAL(p, s)
  • BOOLEAN
  • TINYINT(1)
N/ABOOLEANcanBOOLEAN
DATEDATEDATEDATE
TIME [(p)]DATETIME [(p)] [WITHOUT TIMEZONE]TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)]TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] [WITHOUT TIMEZONE]
  • CHAR(n)
  • VARCHAR(n)
  • TEXT
  • CHAR(n)
  • VARCHAR(n)
  • CLOB
  • CHAR(n)
  • CHARACTER(n)
  • VARCHAR(n)
  • CHARACTER VARYING(n)
  • TEXT
STRING
  • BINARY
  • VARBINARY
  • BLOB
  • RAW(s)
  • BLOB
BYTEABYTES
N/AN/AARRAYARRAY

Sample code​

Sample code for a source table​

CREATE TEMPORARY TABLE jdbc_source (
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);

CREATE TEMPORARY TABLE blackhole_sink(
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'blackhole'
);

INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;

Sample code for a result table​

  `name` VARCHAR,
`age` INT
) WITH (
'connector' = 'datagen'
);

CREATE TEMPORARY TABLE jdbc_sink (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);

INSERT INTO jdbc_sink
SELECT * FROM datagen_source;

Sample code for a dimension table​

CREATE TEMPORARY TABLE datagen_source(
`id` INT,
`data` BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);

CREATE TEMPORARY TABLE jdbc_dim (
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);

CREATE TEMPORARY TABLE blackhole_sink(
`id` INT,
`data` BIGINT,
`name` VARCHAR
) WITH (
'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT T.`id`,T.`data`, H.`name`
FROM datagen_source AS T
JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;