Manage Fluss catalog
Fluss Catalog
The Fluss Catalog provides a unified metadata layer that makes it easy to manage and discover data stored in Fluss from within Apache Flink. It acts as a bridge between Fluss storage and SQL-based processing by exposing topics and schemas as relational objects like databases and tables. Through the catalog, you can register and query Fluss streams using familiar SQL semantics without worrying about underlying connection details.
The catalog is used for Flink SQL jobs. For DataStream jobs, you use the connector directly.
Concepts Mapping
The Fluss hierarchical system is organized around a database–table abstraction. This design maps directly onto Apache Flink’s Catalog API, allowing you to query Fluss streams as if they were standard database tables.
Flink | Fluss |
---|---|
Catalog | n/a |
Catalog Database | Database |
Table | Table |
Feature Support
For Flink's Table API, Fluss supports the following features
Feature | Flink Support | Notes |
---|---|---|
SQL CREATE CATALOG | Yes | |
SQL CREATE DATABASE | Yes | |
SQL DROP DATABASE | Yes | |
SQL CREATE TABLE | Yes | |
SQL CREATE TABLE LIKE | Yes | |
SQL DROP TABLE | Yes | |
SQL SHOW PARTITIONS | Yes | |
SQL ADD PARTITION | Yes | |
SQL DROP PARTITION | Yes | |
SQL SELECT | Yes | Supports both streaming and batch mode. |
SQL LIMIT | Yes | Only for Log Table. |
SQL INSERT INTO | Yes | Supports both streaming and batch mode. |
SQL DELETE FROM | Yes | Batch mode only. |
SQL UPDATE | Yes | Batch mode only. |
SQL LOOKUP JOIN | Yes |
DDL (Data Definition Language)
You can manage Fluss tables through the Fluss Catalog using Flink SQL. You must create a catalog first and then reference tables using their fully qualified names (e.g., <catalog>.<database>.<table>
) or by setting the current catalog with a USE CATALOG
statement.
Create a Catalog
Use either:
CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = 'fluss-server-1:9123'
);
OR
USE CATALOG fluss_catalog;
Catalog Options
Option | Required | Default | Description |
---|---|---|---|
type | required | (none) | Catalog type, must be fluss . |
bootstrap.servers | required | (none) | Comma-separated list of Fluss servers. |
default-database | optional | fluss | The default database to use when you switch to this catalog. |
client.security.protocol | optional | PLAINTEXT | The security protocol to communicate with brokers (PLAINTEXT or SASL ). |
client.security.{protocol}.* | optional | (none) | Client-side configuration properties for a specific authentication protocol, e.g., client.security.sasl.jaas.config . |
Databases
By default, Fluss Catalog uses the fluss
database. You can create a separate database to avoid creating tables under the default.
CREATE DATABASE my_db;
USE my_db;
To delete a database, which also drops all tables within it:
-- Switch to a different database first
USE fluss;
-- Drop the desired database
DROP DATABASE my_db;
Tables
The following examples assume you have set the current catalog with USE CATALOG <catalog_name>
.
- Primary Key Table:
The following SQL statement will create a Primary Key Table with a primary key consisting of shop_id
and user_id
.
CREATE TABLE my_pk_table (
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
PRIMARY KEY (shop_id, user_id) NOT ENFORCED
) WITH (
'bucket.num' = '4'
);
- Log Table
The following SQL statement creates a Log Table by not specifying primary key clause.
CREATE TABLE my_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) WITH (
'bucket.num' = '8'
);
- Partitioned Tables
Fluss supports partitioned tables, including dynamic partition creation. The partition field must be of type STRING
and must be part of the primary key for Primary Key Tables.
The following SQL statement creates a Partitioned Primary Key Table in Fluss. In this example, the partitioned field (dt
) is a subset of the primary key (dt
, shop_id
, user_id
).
CREATE TABLE my_part_pk_table (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt);
The following SQL statement creates a Partitioned Log Table in Fluss.
CREATE TABLE my_multi_fields_part_log_table (
order_id BIGINT,
dt STRING,
nation STRING
) PARTITIONED BY (dt, nation);
Fluss partitioned table supports dynamic partition creation, which means you can write data into a partition without pre-creating it. You can use the INSERT INTO
statement to write data into a partitioned table, and Fluss will automatically create the partition if it does not exist.
See Dynamic Partitioning for more details. You can still use the Add Partition
statement to manually add partitions, if needed.
Fluss supports Multi-Field Partitioning. The following SQL statement creates a Multi-Field Partitioned Log Table in Fluss:
CREATE TABLE my_multi_fields_part_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING,
dt STRING,
nation STRING
) PARTITIONED BY (dt, nation);
Fluss supports creating Auto Partitioned (Primary Key/Log) Table. The following SQL statement creates an Auto Partitioned Primary Key Table in Fluss.
CREATE TABLE my_auto_part_pk_table (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket.num' = '4',
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'day'
);
The following SQL statement creates an Auto Partitioned Log Table in Fluss.
CREATE TABLE my_auto_part_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING,
dt STRING
) PARTITIONED BY (dt) WITH (
'bucket.num' = '8',
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'hour'
);
For more details about Auto Partitioned (Primary Key/Log) Table, refer to Auto Partitioning.
To create a table with the same schema, partitioning, and table properties as another table, use CREATE TABLE LIKE
.
--Create a temporary table
CREATE TEMPORARY TABLE datagen (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);
-- Create a Fluss table that derives the metadata from the temporary table excluding options
CREATE TABLE my_table LIKE datagen (EXCLUDING OPTIONS);
- Delete tables
To remove all the data of a table in a Fluss cluster, run:
DROP TABLE my_table;
- Manually add partitions
Fluss supports manually adding partitions to an existing partitioned table through the Fluss Catalog. If the specified partition does not exist, Fluss will create the partition. If the specified partition already exists, Fluss will ignore the request or throw an exception.
To add partitions, run:
-- Add a partition to a single field partitioned table
ALTER TABLE my_part_pk_table ADD PARTITION (dt = '2025-03-05');
-- Add a partition to a multi-field partitioned table
ALTER TABLE my_multi_fields_part_log_table ADD PARTITION (dt = '2025-03-05', nation = 'US');
To show all the partitions of a partitioned table, run:
SHOW PARTITIONS my_part_pk_table;
For multi-field partitioned tables, you can use the SHOW PARTITIONS
command with either partial or full partition field conditions to list matching partitions.
-- Show partitions using a partial partition filter
SHOW PARTITIONS my_multi_fields_part_log_table PARTITION (dt = '2025-03-05');
-- Show partitions using a full partition filter
SHOW PARTITIONS my_multi_fields_part_log_table PARTITION (dt = '2025-03-05', nation = 'US');
Fluss also supports manually dropping partitions from an existing partitioned table through the Fluss Catalog. If the specified partition does not exist, Fluss will ignore the request or throw an exception.
To drop partitions, run:
-- Drop a partition from a single field partitioned table
ALTER TABLE my_part_pk_table DROP PARTITION (dt = '2025-03-05');
-- Drop a partition from a multi-field partitioned table
ALTER TABLE my_multi_fields_part_log_table DROP PARTITION (dt = '2025-03-05', nation = 'US');
Reads
Fluss supports both streaming and batch reads. You can switch the execution mode for your current session:
-- Execute Flink jobs in streaming mode
SET 'execution.runtime-mode' = 'streaming';
-- Execute Flink jobs in batch mode
SET 'execution.runtime-mode' = 'batch';
By default, a streaming read produces the latest snapshot on the table upon first startup and then continues to read the latest changes. You can also use SQL hints to modify this behavior, for example, to read only the latest changes without a snapshot.
-- Read only new changes, skipping the initial snapshot
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'latest') */;
-- Read from a specific timestamp
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '2023-12-09 23:09:12') */;
Writes
You can write data to Fluss tables using INSERT INTO
. This supports both streaming and batch modes. INSERT INTO
on a primary key table results in an upsert operation, while on a log table it results in an append.
-- Append data from a source table to a log table
INSERT INTO log_table
SELECT * FROM source_table;
-- Upsert data into a primary key table (full or partial update)
INSERT INTO pk_table
SELECT * FROM source_table;
-- Partially update only the num_orders column
INSERT INTO pk_table (shop_id, user_id, num_orders)
SELECT shop_id, user_id, num_orders FROM source_table;
You can also delete or update data in primary-key tables in batch mode only.
-- Delete rows based on the primary key
SET 'execution.runtime-mode' = 'batch';
DELETE FROM pk_table WHERE shop_id = 10000 AND user_id = 123456;
-- Update rows based on the primary key
SET 'execution.runtime-mode' = 'batch';
UPDATE pk_table SET total_amount = 2 WHERE shop_id = 10000 AND user_id = 123456;
Lookups
Lookup Joins
You can use a primary key table as a dimension table for a lookup join.
- The join condition must include all primary keys of the dimension table.
- The lookup join runs in asynchronous mode by default for higher throughput.
- You can change to synchronous mode by setting the SQL Hint
'lookup.async' = 'false'
.
Example
First, create the necessary source and dimension tables.
CREATE TABLE `fluss_catalog`.`my_db`.`orders` (
`o_orderkey` INT NOT NULL,
`o_custkey` INT NOT NULL,
`o_orderstatus` CHAR(1) NOT NULL,
`o_totalprice` DECIMAL(15, 2) NOT NULL,
`o_orderdate` DATE NOT NULL,
`o_orderpriority` CHAR(15) NOT NULL,
`o_clerk` CHAR(15) NOT NULL,
`o_shippriority` INT NOT NULL,
`o_comment` STRING NOT NULL,
`o_dt` STRING NOT NULL,
PRIMARY KEY (o_orderkey) NOT ENFORCED
);
CREATE TABLE `fluss_catalog`.`my_db`.`customer` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` CHAR(15) NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` CHAR(10) NOT NULL,
`c_comment` STRING NOT NULL,
PRIMARY KEY (c_custkey) NOT ENFORCED
);
Next, perform the lookup join to populate a sink table.
CREATE TEMPORARY TABLE lookup_join_sink (
order_key INT NOT NULL,
order_totalprice DECIMAL(15, 2) NOT NULL,
customer_name STRING NOT NULL,
customer_address STRING NOT NULL
) WITH ('connector' = 'blackhole');
-- Look up join in default asynchronous mode
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
LEFT JOIN `customer` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;
-- Look up join in synchronous mode using a hint
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
LEFT JOIN `customer` /*+ OPTIONS('lookup.async' = 'false') */
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;
Partitioned Table Example
To join with a partitioned dimension table, you must specify all primary keys (including the partition key) in the join condition.
CREATE TABLE `fluss_catalog`.`my_db`.`customer_partitioned` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` CHAR(15) NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` CHAR(10) NOT NULL,
`c_comment` STRING NOT NULL,
`dt` STRING NOT NULL,
PRIMARY KEY (`c_custkey`, `dt`) NOT ENFORCED
)
PARTITIONED BY (`dt`)
WITH (
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'year'
);
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM (SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
LEFT JOIN `customer_partitioned` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey` AND `o`.`o_dt` = `c`.`dt`;
Prefix Lookups
You can also perform a lookup join where the join condition is a prefix subset of the primary keys of the dimension table.
-
The
bucket.key
property of the dimension table must be set as the join key. -
This join is also asynchronous by default.
Example
First, create the tables, ensuring bucket.key
is defined on the dimension table.
CREATE TABLE `fluss_catalog`.`my_db`.`orders_with_dt` (
`o_orderkey` INT NOT NULL,
`o_custkey` INT NOT NULL,
`o_orderstatus` CHAR(1) NOT NULL,
`o_totalprice` DECIMAL(15, 2) NOT NULL,
`o_orderdate` DATE NOT NULL,
`o_orderpriority` CHAR(15) NOT NULL,
`o_clerk` CHAR(15) NOT NULL,
`o_shippriority` INT NOT NULL,
`o_comment` STRING NOT NULL,
`o_dt` STRING NOT NULL,
PRIMARY KEY (o_orderkey) NOT ENFORCED
);
-- Primary keys are (c_custkey, c_nationkey)
-- Bucket key is (c_custkey)
CREATE TABLE `fluss_catalog`.`my_db`.`customer_with_bucket_key` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` CHAR(15) NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` CHAR(10) NOT NULL,
`c_comment` STRING NOT NULL,
PRIMARY KEY (`c_custkey`, `c_nationkey`) NOT ENFORCED
) WITH (
'bucket.key' = 'c_custkey'
);
Next, perform the prefix lookup join.
CREATE TEMPORARY TABLE prefix_lookup_join_sink (
order_key INT NOT NULL,
order_totalprice DECIMAL(15, 2) NOT NULL,
customer_name STRING NOT NULL,
customer_address STRING NOT NULL
) WITH ('connector' = 'blackhole');
-- Prefix lookup join where the join key is a prefix
-- of the dimension table's primary keys.
INSERT INTO prefix_lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders_with_dt`.*, proctime() AS ptime FROM `orders_with_dt`) AS `o`
LEFT JOIN `customer_with_bucket_key` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;
Partitioned Table Example
For a prefix lookup with a partitioned table, the join key must be a prefix subset of the primary keys (excluding the partition key) plus the partition key itself.
CREATE TABLE `fluss_catalog`.`my_db`.`customer_partitioned_with_bucket_key` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` CHAR(15) NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` CHAR(10) NOT NULL,
`c_comment` STRING NOT NULL,
`dt` STRING NOT NULL,
PRIMARY KEY (`c_custkey`, `c_nationkey`, `dt`) NOT ENFORCED
)
PARTITIONED BY (`dt`)
WITH (
'bucket.key' = 'c_custkey',
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'year'
);
INSERT INTO prefix_lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders_with_dt`.*, proctime() AS ptime FROM `orders_with_dt`) AS `o`
LEFT JOIN `customer_partitioned_with_bucket_key` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey` AND `o`.`o_dt` = `c`.`dt`;
Type Mapping
When you read from or write to Fluss, data types are automatically converted between the two systems. The following tables define the specific mappings for data moving from Fluss to Flink and from Flink to Fluss, and highlight any Flink types that are not supported for writing to Fluss.
Fluss to Flink
Fluss | Flink |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
CHAR | CHAR |
STRING | STRING |
DECIMAL | DECIMAL |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
TIMESTAMP_LTZ | TIMESTAMP_LTZ |
BYTES | BYTES |
Flink to Fluss
Flink | Fluss |
---|---|
BOOLEAN | BOOLEAN |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
CHAR | CHAR |
STRING | STRING |
DECIMAL | DECIMAL |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
TIMESTAMP_LTZ | TIMESTAMP_LTZ |
BYTES | BYTES |
VARCHAR | Not supported, suggest STRING . |
VARBINARY | Not supported, suggest BYTES . |
INTERVAL | Not supported |
ARRAY | Not supported |
MAP | Not supported |
MULTISET | Not supported |
ROW | Not supported |
RAW | Not supported |
Optimizations
Column Pruning
Column pruning minimizes I/O by reading only the columns used in a query and ignoring unused ones at the storage layer.
- Column pruning is only available when the table uses the Arrow log format (
'table.log.format' = 'arrow'
), which is enabled by default. - Reading log data from remote storage currently does not support column pruning.
Example
First, create a table:
CREATE TABLE `log_table` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` STRING NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` STRING NOT NULL,
`c_comment` STRING NOT NULL
);
Next, query a single column from the table:
SELECT `c_name` FROM `log_table`;
To verify that column pruning is working, you can use EXPLAIN
. The output confirms that only the c_name
column is being read from storage.
EXPLAIN SELECT `c_name` FROM `log_table`;
-- Optimized Execution Plan
TableSourceScan(table=[[fluss_catalog, fluss, log_table, project=[c_name]]], fields=[c_name])
Partition Pruning
Partition pruning reduces the number of partitions scanned during a query by filtering based on partition keys in the WHERE
clause. This is especially useful for streaming queries on tables with many partitions and supports dynamically pruning newly created partitions.
Currently, only equality conditions (e.g., c_nationkey = US
) are supported for partition pruning. Operators like <
, >
, OR
, and IN
are not yet supported.
Example
First, create a partitioned table:
CREATE TABLE `log_partitioned_table` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` STRING NOT NULL,
`c_phone` STRING NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` STRING NOT NULL,
`c_comment` STRING NOT NULL,
`dt` STRING NOT NULL
) PARTITIONED BY (`c_nationkey`,`dt`);
Next, query the table with a filter on a partition key. The source will only scan partitions where c_nationkey
is US
.
SELECT * FROM `log_partitioned_table` WHERE `c_nationkey` = 'US';
To verify, you can use EXPLAIN. The output confirms that the filter is being pushed down to the source scan.
EXPLAIN SELECT * FROM `log_partitioned_table` WHERE `c_nationkey` = 'US';
-- Optimized Execution Plan
TableSourceScan(table=[[fluss_catalog, fluss, log_partitioned_table, filter=[=(c_nationkey, 'US')]]], ...)
Ververica Workflow
This section provides a complete workflow for using the Fluss Catalog within a Ververica deployment. The example covers setting up an external Fluss instance, connecting to it from the SQL Editor, and then creating and managing tables and data.
Prerequisites
Before you begin, ensure you have the following:
- An active Ververica deployment.
- An AWS account with permissions to create and manage EC2 instances and security groups.
- The Apache Fluss package downloaded and ready to be configured.
Step 1: Configure and Start Fluss on an EC2 Instance
To connect your deployment to Fluss, you first need a running Fluss cluster that is accessible from the internet. This example uses a simple local deployment on a single AWS EC2 instance.
-
Modify Configuration Files: To allow external connections, you need to modify the listeners in
local-cluster.sh
andserver.yaml
. Replace<DNS_name_ec2>
with your EC2 instance's public DNS name.-
./bin/local-cluster.sh
# Start zookeeper
"$FLUSS_BIN_DIR"/fluss-daemon.sh start zookeeper "${FLUSS_CONF_DIR}"/zookeeper.properties
# Start single Coordinator Server on this machine
"$FLUSS_BIN_DIR"/coordinator-server.sh start
# Start single Tablet Server on this machine.
# Set bind.listeners as config option to avoid port binding conflict with coordinator server
"${FLUSS_BIN_DIR}"/tablet-server.sh start \
-Dbind.listeners=FLUSS://0.0.0.0:9126,EXTERNAL://0.0.0.0:9125 \
-Dadvertised.listeners=EXTERNAL://<DNS_name_ec2>:9125 \
-Dinternal.listener.name=FLUSS -
./conf/server.yaml
#================================================
# Listeners
#================================================
# The format is '{listener_name}://{host}:{port}'.
# Multiple addresses can be specified, separated by commas.
bind.listeners: FLUSS://0.0.0.0:9123,EXTERNAL://0.0.0.0:9124
# Address advertised to external clients.
advertised.listeners: EXTERNAL://<DNS_name_ec2>:9124
# The name of the listener used for internal communication.
internal.listener.name: FLUSS
-
-
Configure AWS Security Group: In the AWS console, ensure the security group for your EC2 instance allows incoming traffic on the ports you configured (e.g.,
9124
and9125
in this example).Warning: Opening ports to
0.0.0.0/0
makes your instance publicly accessible. This setup is for quick testing only. For long-term use, you should implement more restrictive security configurations. -
Start the Cluster: Run the script to start your Fluss cluster.
./bin/local-cluster.sh start
Step 2: Create the Fluss Catalog and Tables
Now, from the SQL Editor in your deployment, you can connect to the Fluss instance.
-
Create the Catalog: Enter the following statement in the SQL Editor, replacing
<ec2_IP>
with your instance's public IP address. To execute it, highlight the code, right-click the selection, and click Run.CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = '<ec2_IP>:9124'
); -
Create Tables: Set the catalog and create the tables you will use for the workflow.
USE CATALOG fluss_catalog;
CREATE TABLE outputTable (id INT, name STRING, amount INT);
CREATE TABLE inputTable (id INT, name STRING, amount INT);You can monitor the creation of the catalog and tables in the Catalogs panel on the left side of the Console.
Step 3: Insert and Migrate Data
With the catalog and tables in place, you can now run Flink SQL jobs to interact with Fluss.
-
Insert Initial Data: Deploy the following SQL to populate the
inputTable
. This is a bounded query, and the job will finish after the values are inserted.USE CATALOG fluss_catalog;
INSERT INTO inputTable VALUES (1,'sam',1), (2,'ric',2); -
Migrate Data Between Tables: After the initial data is inserted, deploy the following query to create a streaming job that reads from
inputTable
and writes tooutputTable
.USE CATALOG fluss_catalog;
INSERT INTO outputTable SELECT * FROM inputTable;noteBy default, this is an unbounded streaming job and will run continuously. To run it as a bounded batch job that finishes after migrating the data, add the following statement before the query:
SET 'execution.runtime-mode' = 'batch';
Step 4: Verify the Results
After the jobs have run, you can verify that the data was successfully written to the tables.
The most direct way to check the contents of a table from the SQL Editor is to run a SELECT
query in batch mode.
SET 'execution.runtime-mode' = 'batch';
SELECT * FROM outputTable;
Alternatively, you can verify the data programmatically using external tools, such as the Fluss Java APIs. The following method is an example of how you could build a simple Java-based reader to poll a Fluss table for records.
private List<String> readFromFlussTable(
String testOutputTable, int counter, long timeoutSeconds) {
// creating Connection object to connect with Fluss cluster
Configuration conf = new Configuration();
conf.setString("bootstrap.servers", "<DNS_ec2_name>:9124");
try (Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TablePath.of("fluss", testOutputTable));
LogScanner logScanner = table.newScan().createLogScanner()) {
int numBuckets = table.getTableInfo().getNumBuckets();
for (int i = 0; i < numBuckets; i++) {
logScanner.subscribeFromBeginning(i);
}
long scanned = 0;
long startTime = System.currentTimeMillis();
long endTime = startTime;
List<String> rowsList = new ArrayList<>();
while (scanned < counter
&& timeoutSeconds > Duration.ofMillis(endTime - startTime).getSeconds()) {
endTime = System.currentTimeMillis();
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
for (TableBucket bucket : scanRecords.buckets()) {
for (ScanRecord record : scanRecords.records(bucket)) {
InternalRow row = record.getRow();
// Process the row
String rowString =
String.format(
"%s,%s,%s", row.getInt(0), row.getString(1), row.getInt(2));
rowsList.add(rowString);
}
}
scanned += scanRecords.count();
}
return rowsList;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
After verifying that the initial records have been inserted, you can migrate data from inputTable
to outputTable
with the following query.
USE CATALOG fluss_catalog;
INSERT INTO outputTable SELECT * FROM inputTable;
By default, this INSERT ... SELECT
statement creates an unbounded streaming job that runs continuously.
To run this as a bounded batch job that finishes after the migration is complete, add the following statement before your query:
SET 'execution.runtime-mode' = 'batch';