Oracle
Background
The Oracle connector allows you to read from and write to Oracle databases in streaming and batch modes using Ververica Platform. The connector is a direct extension of the JDBC Connector, sharing some of the capabilities and configuration options. The following table describes the connector's capabilities.
| Item | Description |
|---|---|
| Table type | Source table, dimension table, and result table |
| Running mode | Streaming mode and batch mode |
| Data Format | N/A |
| Metric | N/A |
| Data update or deletion in a result table | Supported |
Prerequisites
- Access to an active Oracle database instance running at a known network address (host and port).
- The required tables must already exist in the target Oracle database.
- Valid credentials: a username and password with sufficient privileges to read from or write to the tables.
- Knowledge of the Oracle service name associated with your database instance (needed for the JDBC URL and catalog configuration).
Supported Versions
- Oracle 19c, 21c and 23c
- Oracle XE for local testing
Oracle JBDC URL Format
Database names maps to Oracle Service names. As a consequence, the JDBC URL should have the service name after /.
In particular, both catalog and connector assume the following URL format, with a double slash (//) after the @ and a single slash (/) before the service name.
jdbc:oracle:thin:@//<host>:<port>/<service_name>
-
host: The hostname or IP address of your Oracle server.
-
port: The port number (default is
1521). -
service_name: The Oracle service name (e.g.,
ORCL).
In some Oracle versions, the double slash after @ is optional but using it is recommended for compatibility. Always ensure your URL matches your Oracle server configuration and uses the correct service name.
Table Management
The target tables must already exist in the Oracle database, and you must know their exact names. Flink does not create or modify external Oracle tables. It only creates a local table definition that synchronizes with the existing external table on both source and sink.
| Mode | Supported |
|---|---|
| Source | Yes |
| Sink | Yes |
Limitations
| Limitation | Notes |
|---|---|
| Bounded source only | The JDBC source table is a bounded source. It reads all data once and then completes. For real-time change data capture, use a CDC connector. |
| No built-in CDC support | Real-time change data capture is not supported. Use a dedicated CDC connector for this purpose. |
| Driver compatibility | Only tested JDBC drivers are officially supported. If you use a different driver, you must validate its compatibility yourself. |
Example: Defining Source and Sink Tables
You have two tables in your Oracle database:
HR.ORDERS(source table)HR.ORDERS_ARCHIVE(sink table)
- Define the source table in Flink.
CREATE TABLE oracle_orders (
id INT,
order_date TIMESTAMP,
amount DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oracle',
'jdbc-url' = 'jdbc:oracle:thin:@//oracle.example.com:1521/ORCL',
'username' = 'flink_user',
'password' = 'flink_pw',
'table-name' = 'HR.ORDERS'
);
- Define the sink table in Flink.
CREATE TABLE oracle_orders_archive (
id INT,
order_date TIMESTAMP,
amount DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oracle',
'jdbc-url' = 'jdbc:oracle:thin:@//oracle.example.com:1521/ORCL',
'username' = 'flink_user',
'password' = 'flink_pw',
'table-name' = 'HR.ORDERS_ARCHIVE'
);
- Insert data from source to sink.
INSERT INTO oracle_orders_archive
SELECT * FROM oracle_orders;
- Both HR.ORDERS and HR.ORDERS_ARCHIVE must already exist in the Oracle database.
- Flink's CREATE TABLE statement only registers a local table definition that maps to the external Oracle table; it does not create or alter the actual Oracle table.
- Ensure the schema (column names and types) in Flink matches the schema of the existing Oracle tables.
Properties
In the table below is a summary of supported connector options. These are typically set in the Flink SQL WITH clause.
| Property | Required | Default | Description |
|---|---|---|---|
| connector | Yes | Must be set to 'oracle'. | |
| url | Yes | The JDBC URL to connect to Oracle DB. | |
| username | Yes | Oracle DB username with appropriate read/write privileges. | |
| password | Yes | Password for the username. | |
| table-name | Yes | Table to read from or write to. | |
| scan.startup.mode | Optional | initial | Startup mode for reading: initial, latest-offset, or timestamp. |
| driver | Optional | Oracle drivers | The class name of the JDBC driver to use. |
| connection.max-retry-timeout | Optional | 60s | Maximum retry timeout for connection attempts. |
| lookup.cache.max-rows | Optional | 5000 | Maximum number of rows to cache for lookup joins. |
| lookup.cache.ttl | Optional | 10 min | Time-to-live for cached lookup rows. |
| sink.buffer-flush.max-rows | Optional | 100 | Maximum number of rows to buffer before flushing to the database. |
| sink.buffer-flush.interval | Optional | 1s | Interval for flushing buffered rows to the database. |
| sink.max-retries | Optional | 3 | Maximum number of retries for failed writes. |
| scan.auto-commit | Optional | true | Sets the auto-commit flag on the Oracle driver, which determines whether each statement is committed in a transaction automatically. |
- Some options are inherited from the JDBC connector. Refer to the JDBC connector documentation for a full list of supported properties and advanced configuration.