PostgreSQL/JDBC
This topic describes how to use the Java Database Connectivity (JDBC) connector.
Background Information
The JDBC connector is provided by Apache Flink and can be used to read data from and write data to common databases, such as MySQL, PostgreSQL, and Oracle. The following table describes the capabilities supported by the JDBC connector.
Item | Description |
---|---|
Table type | Source table, dimension table, and result table |
Running mode | Streaming mode and batch mode |
Data format | N/A |
Metric | N/A |
Data update or deletion in a result table | Supported |
Prerequisites
The database and table to which you want to connect are created.
Limits
- Only Realtime Compute for Apache Flink that uses VERA 1.0.3 or later supports the JDBC connector.
- A JDBC source table is a bounded source. After the JDBC source connector reads all data from a table in an upstream database and writes the data to a source table, the task for the JDBC source table is complete. If you want to capture real-time change data, use a Change Data Capture (CDC) connector. For more information, see Create a MySQL CDC source table and Create a PostgreSQL CDC source table.
- If you use the JDBC sink connector to connect to a PostgreSQL database, make sure that the database version is PostgreSQL 9.5 or later. PostgreSQL uses the ON CONFLICT syntax to insert or update data when a primary key is specified in a DDL statement. The ON CONFLICT syntax is supported only in PostgreSQL 9.5 or later.
- Fully managed Flink supports only the open source JDBC connector that does not include a JDBC driver for a specific database.
note
When you use the JDBC connector, you must manually upload the JAR package of the driver of the destination database as a dependency file. For more information, see SQL Editor window. The following table describes the JDBC drivers supported by fully managed Flink.
Driver | Group ID | Artifact ID |
---|---|---|
MySQL | mysql | mysql-connector-java |
Oracle | com.oracle.database.jdbc | ojdbc8 |
PostgreSQL | org.postgresql | postgresql |
If you use a JDBC driver that is not included in the table, you must test the validity and availability of the JDBC driver.
Syntax
CREATE TABLE jdbc_table (
`id` BIGINT,
`name` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
Parameters in the WITH Clause
Common Parameters
Parameter | Description | Data Type | Required | Default Value | Remarks |
---|---|---|---|---|---|
connector | The type of the table. | STRING | Yes | No default value | Set the value to jdbc. |
url | The URL of the database. | STRING | Yes | No default value | N/A. |
table-name | The name of the JDBC table. | STRING | Yes | No default value | N/A. |
username | The name of the JDBC user. | STRING | No | No default value | If you configure one of the username and password parameters, you must also configure the other parameter. |
password | The password of the JDBC user. | STRING | No | No default value |
Parameters Only for Source Tables
Parameter | Description | Data Type | Required | Default Value | Remarks |
---|---|---|---|---|---|
scan.partition.column | The name of the column that is used to partition the input data. | STRING | No | No default value | The values in the column must be of the NUMERIC, DATE, or TIMESTAMP type. For more information about partitioned scan, see Partitioned Scan. |
scan.partition.num | The number of partitions. | INTEGER | No | No default value | N/A. |
scan.partition.lower-bound | The smallest value of the first partition. | INTEGER | No | No default value | N/A. |
scan.partition.upper-bound | The largest value of the last partition. | INTEGER | No | No default value | N/A. |
scan.fetch-size | The number of rows of data that are obtained from the database each time data is read from a source table. | INTEGER | No | 0 | If you set this parameter to 0, this parameter is ignored. |
scan.auto-commit | Specifies whether to enable auto-commit. | BOOLEAN | No | true | N/A. |
Parameters Only for Result Tables
Parameter | Description | Data Type | Required | Default Value | Remarks |
---|---|---|---|---|---|
sink.buffer-flush.max-rows | The maximum number of data records that can be cached before the flush operation is performed. | INTEGER | No | 100 | If you set this parameter to 0, data records are not cached before the flush operation is performed. |
sink.buffer-flush.interval | The interval at which data is flushed. If data records are cached for a period that exceeds the duration specified by this parameter, the flush operation is performed in an asynchronous thread. | DURATION | No | 1s | If you set this parameter to 0, data records are not cached before the flush operation is performed. See note below. |
sink.max-retries | The maximum number of retries that are allowed when data fails to be written to the database. | INTEGER | No | 3 | N/A. |
If you want to process cached flush events in asynchronous mode, you can set the sink.buffer-flush.max-rows
parameter to 0 and configure the sink.buffer-flush.interval
parameter based on your business requirements.
Parameters Only for Dimension Tables
Parameter | Description | Data Type | Required | Default Value | Remarks |
---|---|---|---|---|---|
lookup.cache.max-rows | The maximum number of rows of data that can be cached. If the number of rows of data in the cache exceeds the value of this parameter, the earliest row of data expires and is replaced by a new row of data. | INTEGER | No | No default value | By default, caching for dimension tables is disabled. You can configure the lookup.cache.max-rows and lookup.cache.ttl parameters to enable caching for dimension tables. If caching for dimension tables is enabled, the LRU cache policy is used. |
lookup.cache.ttl | The maximum time to live (TTL) of each row of data in the cache. If the time period for which a row of data is cached exceeds the value of this parameter, the row of data expires. | DURATION | No | No default value | |
lookup.cache.caching-missing-key | Specifies whether to cache empty query results. | BOOLEAN | No | true | Valid values: true - Empty query results are cached. This is the default value. false - Empty query results are not cached. |
lookup.max-retries | The maximum number of retries when the database fails to be queried. | INTEGER | No | 3 | N/A. |
Data Type Mappings
Data type of MySQL | Data type of Oracle | Data type of PostgreSQL | Data type of Flink SQL |
---|---|---|---|
TINYINT | N/A | N/A | TINYINT |
| N/A |
| SMALLINT |
| N/A |
| INT |
| N/A |
| BIGINT |
BIGINT UNSIGNED | N/A | N/A | DECIMAL(20, 0) |
BIGINT | N/A | BIGINT | BIGINT |
FLOAT | BINARY_FLOAT |
| FLOAT |
| BINARY_DOUBLE |
| DOUBLE |
|
|
| DECIMAL(p, s) |
| N/A | BOOLEANcan | BOOLEAN |
DATE | DATE | DATE | DATE |
TIME [(p)] | DATE | TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
|
|
| STRING |
|
| BYTEA | BYTES |
N/A | N/A | ARRAY | ARRAY |
Sample Code
Sample Code for a Source Table
CREATE TEMPORARY TABLE jdbc_source (
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
CREATE TEMPORARY TABLE blackhole_sink(
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
Sample Code for a Result Table
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE jdbc_sink (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
INSERT INTO jdbc_sink
SELECT * FROM datagen_source;
Sample Code for a Dimension Table
CREATE TEMPORARY TABLE datagen_source(
`id` INT,
`data` BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE jdbc_dim (
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
CREATE TEMPORARY TABLE blackhole_sink(
`id` INT,
`data` BIGINT,
`name` VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT T.`id`,T.`data`, H.`name`
FROM datagen_source AS T
JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;