Oracle catalog
The Oracle Flink Catalog allows you to manage Oracle schemas and tables directly from Flink SQL. This enables seamless discovery and management of Oracle database objects as Flink tables.
Limitations
In addtion to the Oracle Connector limitations, Oracle Catalog also does not support the following.
Limitation | Notes |
---|---|
Schema changes | Limited support for evolving schemas (e.g., adding columns). |
Schema modifications | DDL altering operations (e.g., adding columns) are not supported; dropping or renaming columns are not be detected. |
Functions | Unsupported Function Management. |
Partitions | Unsupported Partition Management. |
Simultaneous use of catalog and connector tables | Not supported. Because table creation is not implemented, attempting to create a table when both catalog and connector are used will fail, as the catalog's table creation methods are not supported. |
Catalog Options
In the table below are the main options for configuring an Oracle Flink Catalog. See the Flink JDBC Catalog Documentation for full details.
Option | Required | Description |
---|---|---|
type | Yes | Set to 'oracle' for Oracle catalog. |
base-url | Yes | The base JDBC URL, e.g., jdbc:oracle:thin:@//host:port/. |
default-database | Yes | The default Oracle service to use, e.g., ORCL. |
username | Yes | Oracle DB username. |
password | Yes | Password for the username. |
driver | optional | JDBC driver class name (defaults to Oracle driver). |
property-versions | optional | Property version for compatibility. |
Example: Creating an Oracle Flink Catalog in Flink SQL
CREATE CATALOG my_oracle_catalog WITH (
'type' = 'oracle',
'base-url' = 'jdbc:oracle:thin:@//oracle.example.com:1521/',
'default-database' = 'ORCL',
'username' = 'flink_user',
'password' = 'flink_pw'
);
After creating the catalog, you can reference Oracle tables as Flink tables, but before you need to USE
it:
USE CATALOG my_oracle_catalog;
SELECT * FROM HR.ORDERS;
Ensure that the Flink job has access to the Oracle JDBC driver. You might need to include it in the classpath or load it through plugin directories.
Flink Catalog & Oracle DB Concepts Mapping
The following table summarizes how Oracle DB concepts map to Flink catalog concepts:
Flink Catalog Concept | Oracle DB Equivalent (Catalog) |
---|---|
Catalog name | N/A |
Schema name | User name |
Database name | Service name |
Table name | Table name |
Tables Name Mapping
In Oracle, the schema name is synonymous with the username. Each user owns a schema with the same name. When the Oracle Flink Catalog imports tables, it also imports the schema name, which corresponds to the Oracle username.
Therefore, when referencing a table from the catalog, you must use the fully qualified name in the format <USER>.<TABLE>
, both in uppercase.
SELECT * FROM HR.ORDERS;
When importing tables using the Oracle Flink Catalog, the catalog retrieves table metadata directly from Oracle. Oracle stores unquoted identifiers in uppercase by default, so all table names are imported as uppercase. As a result, when referencing catalog tables in Flink SQL queries, you must use uppercase table names. Failing to do so may result in "table not found" errors. Similarly, catalog names in Flink are case-sensitive. If you create a catalog with an uppercase or mixed-case name, you must reference it using the exact same case.
Always use uppercase table names and reference catalog names with the exact case used during creation when querying Oracle catalog tables in Flink SQL.
Example: Catalog Creation and Usage
-- Create a catalog with an uppercase name
CREATE CATALOG ORACLE_CAT WITH (
'type' = 'oracle',
...
);
-- Use the catalog (case-sensitive)
USE CATALOG ORACLE_CAT;
-- Query a table (both schema and table names in uppercase)
SELECT * FROM HR.ORDERS;
Example Workflow
###Step 1: Setting
Set the Oracle Database with two tables, already inserted, created by the following queries. Use any console. This example is using the Intellij Oracle console.
CREATE TABLE financial_transaction_in (
name VARCHAR(255),
amount DECIMAL(10,2)
);
CREATE TABLE financial_transaction_out (
name VARCHAR(255),
amount DECIMAL(10,2)
);
INSERT INTO financial_transaction_in (name, amount) VALUES ('samuele', 1.00);
Table financial_transaction_in
has one entry, with two predefined values. Table financial_transaction_out
, on the other hand, does not have any element inside.
In Oracle, it is critical to login as the correct user, because users can manipulate the tables that they have the specific rights to. This requirement propagates also to the catalog and connector.
Step 2: Connector Case
To test the connector, create two Flink tables, each one linked to the one of the two Oracle Tables. (i.e., FlinkTable1 financial_transaction_in
and FlinkTable2 financial_transaction_out
).
First, insert values into financial_transaction_in
.
CREATE TEMPORARY TABLE FlinkTable1 (name STRING, amount INT)
WITH (
'connector' = 'oracle',
'url' = 'jdbc:oracle:thin:@database-2.cpozd9x7eku2.eu-central-1.rds.amazonaws.com:1521:ORCL',
'table-name' = 'FINANCIAL_TRANSACTION_IN',
'username' = 'cc_test',
'password' = 'rurvyQ-sygwyw-2tufru',
'scan.auto-commit' = 'true'
);
CREATE TEMPORARY TABLE FlinkTable2 (name STRING, amount INT)
WITH (
'connector' = 'oracle',
'url' = 'jdbc:oracle:thin:@*****:1521:ORCL',
'table-name' = 'FINANCIAL_TRANSACTION_OUT',
'username' = 'cc_test',
'password' = 'rurvyQ-sygwyw-2tufru',
'scan.auto-commit' = 'true'
);
INSERT INTO FlinkTable1
VALUES ('Marco',2),('John',3)
;
SQL should be run using the Deploy command, and then Start the job on the Deployment page (access the page from the top right corner of the SQL console).
After the job is finished, you will see two entries are added to the FINANCIAL_TRANSACTION_IN table on the Oracle console.
Oracle internally converts all table names and attributes to upper case. To prevent any possible problem related to the format, we adopt upper case in the Flink context, while both using the catalog or the connector.
Second, run the following query, which tests if the element is correctly read from the in
table and written on the out
table.
In the following SQL, we replicate the table creation as they are TEMPORARY tables, active only within one SQL deployment.
CREATE TEMPORARY TABLE FlinkTable1 (name STRING, amount INT)
WITH (
'connector' = 'oracle',
'url' = 'jdbc:oracle:thin:@*****:1521:ORCL',
'table-name' = 'FINANCIAL_TRANSACTION_IN',
'username' = '*****',
'password' = 'rurvyQ-sygwyw-2tufru',
'scan.auto-commit' = 'true'
);
CREATE TEMPORARY TABLE FlinkTable2 (name STRING, amount INT)
WITH (
'connector' = 'oracle',
'url' = 'jdbc:oracle:thin:@*****:1521:ORCL',
'table-name' = 'FINANCIAL_TRANSACTION_OUT',
'username' = '*****',
'password' = '*****',
'scan.auto-commit' = 'true'
);
INSERT INTO FlinkTable2
SELECT * FROM FlinkTable1
;
After a job has been deployed and terminated, all records from financial_transaction_in
, i.e., ('Samuele',1),('Marco',2),('John',3) should be place in financial_transaction_out
.
Step 3: Catalog Case
This step follows the same approach as the connector, but this time using the catalog. Start from the same setting described in step 1 (reset all changes done by connectors) and execute CREATE CATALOG
.
CREATE CATALOG my_oracle_catalog
WITH (
'type' = 'oracle'
,'default-database' = 'ORCL'
,'base-url' = 'jdbc:oracle:thin:@database-2.cpozd9x7eku2.eu-central-1.rds.amazonaws.com:1521'
,'username' = 'cc_test'
,'password' = 'rurvyQ-sygwyw-2tufru'
)
;
This statement can’t be executed on Ververica CLoud using Deploy
. Instead, select the text, right click it, and select Run
.
After the statement is executed, you should see a new catalog on related panel in Ververica Cloud.
The catalog should show all the tables you have access to.
It is possible that many tables will appear in the catalog if you are an admin (have access to all system tables). Use the search bar to search for the tables you need.
After the catalog is created, either insert records into the tables, following what was done for the connector (INSERT INTO VALUES
), or do a INSERT INTO … SELECT FROM
query like the following.
Remember to USE
the CATALOG
before executing the query, and run it with the Deploy workflow.
USE CATALOG my_oracle_catalog;
INSERT INTO `<USER>.FINANCIAL_TRANSACTION_OUT`
SELECT * FROM `<USER>.FINANCIAL_TRANSACTION_IN`;
In catalogs, we reference tables by their schema/user of reference. Also, all the reference should be wrapped within (`), to prevent misinterpretation within the table object path.
As a result, you should see ('Samuele',1)
in financial_transaction_out
.