Skip to main content

Manage Fluss catalog

Fluss Catalog

The Fluss Catalog provides a unified metadata layer that makes it easy to manage and discover data stored in Fluss from within Apache Flink. It acts as a bridge between Fluss storage and SQL-based processing by exposing topics and schemas as relational objects like databases and tables. Through the catalog, you can register and query Fluss streams using familiar SQL semantics without worrying about underlying connection details.

note

The catalog is used for Flink SQL jobs. For DataStream jobs, you use the connector directly.

Concepts Mapping

The Fluss hierarchical system is organized around a database–table abstraction. This design maps directly onto Apache Flink’s Catalog API, allowing you to query Fluss streams as if they were standard database tables.

FlinkFluss
Catalogn/a
Catalog DatabaseDatabase
TableTable

Feature Support

For Flink's Table API, Fluss supports the following features

FeatureFlink SupportNotes
SQL CREATE CATALOGYes
SQL CREATE DATABASEYes
SQL DROP DATABASEYes
SQL CREATE TABLEYes
SQL CREATE TABLE LIKEYes
SQL DROP TABLEYes
SQL SHOW PARTITIONSYes
SQL ADD PARTITIONYes
SQL DROP PARTITIONYes
SQL SELECTYesSupports both streaming and batch mode.
SQL LIMITYesOnly for Log Table.
SQL INSERT INTOYesSupports both streaming and batch mode.
SQL DELETE FROMYesBatch mode only.
SQL UPDATEYesBatch mode only.
SQL LOOKUP JOINYes

DDL (Data Definition Language)

You can manage Fluss tables through the Fluss Catalog using Flink SQL. You must create a catalog first and then reference tables using their fully qualified names (e.g., <catalog>.<database>.<table>) or by setting the current catalog with a USE CATALOG statement.

Create a Catalog

Use either:

CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = 'fluss-server-1:9123'
);

OR

USE CATALOG fluss_catalog;

Catalog Options

OptionRequiredDefaultDescription
typerequired(none)Catalog type, must be fluss.
bootstrap.serversrequired(none)Comma-separated list of Fluss servers.
default-databaseoptionalflussThe default database to use when you switch to this catalog.
client.security.protocoloptionalPLAINTEXTThe security protocol to communicate with brokers (PLAINTEXT or SASL).
client.security.{protocol}.*optional(none)Client-side configuration properties for a specific authentication protocol, e.g., client.security.sasl.jaas.config.

Databases

By default, Fluss Catalog uses the fluss database. You can create a separate database to avoid creating tables under the default.

CREATE DATABASE my_db;
USE my_db;

To delete a database, which also drops all tables within it:

-- Switch to a different database first
USE fluss;

-- Drop the desired database
DROP DATABASE my_db;

Tables

The following examples assume you have set the current catalog with USE CATALOG <catalog_name>.

  • Primary Key Table:

The following SQL statement will create a Primary Key Table with a primary key consisting of shop_id and user_id.

CREATE TABLE my_pk_table (
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
PRIMARY KEY (shop_id, user_id) NOT ENFORCED
) WITH (
'bucket.num' = '4'
);
  • Log Table

The following SQL statement creates a Log Table by not specifying primary key clause.

CREATE TABLE my_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) WITH (
'bucket.num' = '8'
);
  • Partitioned Tables

Fluss supports partitioned tables, including dynamic partition creation. The partition field must be of type STRING and must be part of the primary key for Primary Key Tables.

The following SQL statement creates a Partitioned Primary Key Table in Fluss. In this example, the partitioned field (dt) is a subset of the primary key (dt, shop_id, user_id).

CREATE TABLE my_part_pk_table (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt);

The following SQL statement creates a Partitioned Log Table in Fluss.

CREATE TABLE my_multi_fields_part_log_table (
order_id BIGINT,
dt STRING,
nation STRING
) PARTITIONED BY (dt, nation);

Fluss partitioned table supports dynamic partition creation, which means you can write data into a partition without pre-creating it. You can use the INSERT INTO statement to write data into a partitioned table, and Fluss will automatically create the partition if it does not exist. See Dynamic Partitioning for more details. You can still use the Add Partition statement to manually add partitions, if needed.

Fluss supports Multi-Field Partitioning. The following SQL statement creates a Multi-Field Partitioned Log Table in Fluss:

CREATE TABLE my_multi_fields_part_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING,
dt STRING,
nation STRING
) PARTITIONED BY (dt, nation);

Fluss supports creating Auto Partitioned (Primary Key/Log) Table. The following SQL statement creates an Auto Partitioned Primary Key Table in Fluss.

CREATE TABLE my_auto_part_pk_table (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket.num' = '4',
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'day'
);

The following SQL statement creates an Auto Partitioned Log Table in Fluss.

CREATE TABLE my_auto_part_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING,
dt STRING
) PARTITIONED BY (dt) WITH (
'bucket.num' = '8',
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'hour'
);

For more details about Auto Partitioned (Primary Key/Log) Table, refer to Auto Partitioning.

To create a table with the same schema, partitioning, and table properties as another table, use CREATE TABLE LIKE.

--Create a temporary table
CREATE TEMPORARY TABLE datagen (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);
-- Create a Fluss table that derives the metadata from the temporary table excluding options
CREATE TABLE my_table LIKE datagen (EXCLUDING OPTIONS);
  • Delete tables

To remove all the data of a table in a Fluss cluster, run:

DROP TABLE my_table;
  • Manually add partitions

Fluss supports manually adding partitions to an existing partitioned table through the Fluss Catalog. If the specified partition does not exist, Fluss will create the partition. If the specified partition already exists, Fluss will ignore the request or throw an exception.

To add partitions, run:

-- Add a partition to a single field partitioned table
ALTER TABLE my_part_pk_table ADD PARTITION (dt = '2025-03-05');

-- Add a partition to a multi-field partitioned table
ALTER TABLE my_multi_fields_part_log_table ADD PARTITION (dt = '2025-03-05', nation = 'US');

To show all the partitions of a partitioned table, run:

SHOW PARTITIONS my_part_pk_table;

For multi-field partitioned tables, you can use the SHOW PARTITIONS command with either partial or full partition field conditions to list matching partitions.

-- Show partitions using a partial partition filter
SHOW PARTITIONS my_multi_fields_part_log_table PARTITION (dt = '2025-03-05');

-- Show partitions using a full partition filter
SHOW PARTITIONS my_multi_fields_part_log_table PARTITION (dt = '2025-03-05', nation = 'US');

Fluss also supports manually dropping partitions from an existing partitioned table through the Fluss Catalog. If the specified partition does not exist, Fluss will ignore the request or throw an exception.

To drop partitions, run:

-- Drop a partition from a single field partitioned table
ALTER TABLE my_part_pk_table DROP PARTITION (dt = '2025-03-05');

-- Drop a partition from a multi-field partitioned table
ALTER TABLE my_multi_fields_part_log_table DROP PARTITION (dt = '2025-03-05', nation = 'US');

Reads

Fluss supports both streaming and batch reads. You can switch the execution mode for your current session:

-- Execute Flink jobs in streaming mode
SET 'execution.runtime-mode' = 'streaming';

-- Execute Flink jobs in batch mode
SET 'execution.runtime-mode' = 'batch';

By default, a streaming read produces the latest snapshot on the table upon first startup and then continues to read the latest changes. You can also use SQL hints to modify this behavior, for example, to read only the latest changes without a snapshot.

-- Read only new changes, skipping the initial snapshot
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'latest') */;

-- Read from a specific timestamp
SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '2023-12-09 23:09:12') */;

Writes

You can write data to Fluss tables using INSERT INTO. This supports both streaming and batch modes. INSERT INTO on a primary key table results in an upsert operation, while on a log table it results in an append.

-- Append data from a source table to a log table
INSERT INTO log_table
SELECT * FROM source_table;

-- Upsert data into a primary key table (full or partial update)
INSERT INTO pk_table
SELECT * FROM source_table;

-- Partially update only the num_orders column
INSERT INTO pk_table (shop_id, user_id, num_orders)
SELECT shop_id, user_id, num_orders FROM source_table;

You can also delete or update data in primary-key tables in batch mode only.

-- Delete rows based on the primary key
SET 'execution.runtime-mode' = 'batch';
DELETE FROM pk_table WHERE shop_id = 10000 AND user_id = 123456;

-- Update rows based on the primary key
SET 'execution.runtime-mode' = 'batch';
UPDATE pk_table SET total_amount = 2 WHERE shop_id = 10000 AND user_id = 123456;

Lookups

Lookup Joins

You can use a primary key table as a dimension table for a lookup join.

  • The join condition must include all primary keys of the dimension table.
  • The lookup join runs in asynchronous mode by default for higher throughput.
  • You can change to synchronous mode by setting the SQL Hint 'lookup.async' = 'false'.

Example

First, create the necessary source and dimension tables.

CREATE TABLE `fluss_catalog`.`my_db`.`orders` (
`o_orderkey` INT NOT NULL,
`o_custkey` INT NOT NULL,
`o_orderstatus` CHAR(1) NOT NULL,
`o_totalprice` DECIMAL(15, 2) NOT NULL,
`o_orderdate` DATE NOT NULL,
`o_orderpriority` CHAR(15) NOT NULL,
`o_clerk` CHAR(15) NOT NULL,
`o_shippriority` INT NOT NULL,
`o_comment` STRING NOT NULL,
`o_dt` STRING NOT NULL,
PRIMARY KEY (o_orderkey) NOT ENFORCED
);
CREATE TABLE `fluss_catalog`.`my_db`.`customer` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` CHAR(15) NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` CHAR(10) NOT NULL,
`c_comment` STRING NOT NULL,
PRIMARY KEY (c_custkey) NOT ENFORCED
);

Next, perform the lookup join to populate a sink table.

CREATE TEMPORARY TABLE lookup_join_sink (
order_key INT NOT NULL,
order_totalprice DECIMAL(15, 2) NOT NULL,
customer_name STRING NOT NULL,
customer_address STRING NOT NULL
) WITH ('connector' = 'blackhole');
-- Look up join in default asynchronous mode
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
LEFT JOIN `customer` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;
-- Look up join in synchronous mode using a hint
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
LEFT JOIN `customer` /*+ OPTIONS('lookup.async' = 'false') */
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;

Partitioned Table Example

To join with a partitioned dimension table, you must specify all primary keys (including the partition key) in the join condition.

CREATE TABLE `fluss_catalog`.`my_db`.`customer_partitioned` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` CHAR(15) NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` CHAR(10) NOT NULL,
`c_comment` STRING NOT NULL,
`dt` STRING NOT NULL,
PRIMARY KEY (`c_custkey`, `dt`) NOT ENFORCED
)
PARTITIONED BY (`dt`)
WITH (
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'year'
);
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM (SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
LEFT JOIN `customer_partitioned` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey` AND `o`.`o_dt` = `c`.`dt`;

Prefix Lookups

You can also perform a lookup join where the join condition is a prefix subset of the primary keys of the dimension table.

  • The bucket.key property of the dimension table must be set as the join key.

  • This join is also asynchronous by default.

Example

First, create the tables, ensuring bucket.key is defined on the dimension table.

CREATE TABLE `fluss_catalog`.`my_db`.`orders_with_dt` (
`o_orderkey` INT NOT NULL,
`o_custkey` INT NOT NULL,
`o_orderstatus` CHAR(1) NOT NULL,
`o_totalprice` DECIMAL(15, 2) NOT NULL,
`o_orderdate` DATE NOT NULL,
`o_orderpriority` CHAR(15) NOT NULL,
`o_clerk` CHAR(15) NOT NULL,
`o_shippriority` INT NOT NULL,
`o_comment` STRING NOT NULL,
`o_dt` STRING NOT NULL,
PRIMARY KEY (o_orderkey) NOT ENFORCED
);
-- Primary keys are (c_custkey, c_nationkey)
-- Bucket key is (c_custkey)
CREATE TABLE `fluss_catalog`.`my_db`.`customer_with_bucket_key` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` CHAR(15) NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` CHAR(10) NOT NULL,
`c_comment` STRING NOT NULL,
PRIMARY KEY (`c_custkey`, `c_nationkey`) NOT ENFORCED
) WITH (
'bucket.key' = 'c_custkey'
);

Next, perform the prefix lookup join.

CREATE TEMPORARY TABLE prefix_lookup_join_sink (
order_key INT NOT NULL,
order_totalprice DECIMAL(15, 2) NOT NULL,
customer_name STRING NOT NULL,
customer_address STRING NOT NULL
) WITH ('connector' = 'blackhole');
-- Prefix lookup join where the join key is a prefix
-- of the dimension table's primary keys.
INSERT INTO prefix_lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders_with_dt`.*, proctime() AS ptime FROM `orders_with_dt`) AS `o`
LEFT JOIN `customer_with_bucket_key` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;

Partitioned Table Example

For a prefix lookup with a partitioned table, the join key must be a prefix subset of the primary keys (excluding the partition key) plus the partition key itself.

CREATE TABLE `fluss_catalog`.`my_db`.`customer_partitioned_with_bucket_key` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` CHAR(15) NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` CHAR(10) NOT NULL,
`c_comment` STRING NOT NULL,
`dt` STRING NOT NULL,
PRIMARY KEY (`c_custkey`, `c_nationkey`, `dt`) NOT ENFORCED
)
PARTITIONED BY (`dt`)
WITH (
'bucket.key' = 'c_custkey',
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'year'
);
INSERT INTO prefix_lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders_with_dt`.*, proctime() AS ptime FROM `orders_with_dt`) AS `o`
LEFT JOIN `customer_partitioned_with_bucket_key` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey` AND `o`.`o_dt` = `c`.`dt`;

Type Mapping

When you read from or write to Fluss, data types are automatically converted between the two systems. The following tables define the specific mappings for data moving from Fluss to Flink and from Flink to Fluss, and highlight any Flink types that are not supported for writing to Fluss.

FlussFlink
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
CHARCHAR
STRINGSTRING
DECIMALDECIMAL
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
TIMESTAMP_LTZTIMESTAMP_LTZ
BYTESBYTES
FlinkFluss
BOOLEANBOOLEAN
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
CHARCHAR
STRINGSTRING
DECIMALDECIMAL
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
TIMESTAMP_LTZTIMESTAMP_LTZ
BYTESBYTES
VARCHARNot supported, suggest STRING.
VARBINARYNot supported, suggest BYTES.
INTERVALNot supported
ARRAYNot supported
MAPNot supported
MULTISETNot supported
ROWNot supported
RAWNot supported

Optimizations

Column Pruning

Column pruning minimizes I/O by reading only the columns used in a query and ignoring unused ones at the storage layer.

note
  • Column pruning is only available when the table uses the Arrow log format ('table.log.format' = 'arrow'), which is enabled by default.
  • Reading log data from remote storage currently does not support column pruning.

Example

First, create a table:

CREATE TABLE `log_table` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` STRING NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` STRING NOT NULL,
`c_comment` STRING NOT NULL
);

Next, query a single column from the table:

SELECT `c_name` FROM `log_table`;

To verify that column pruning is working, you can use EXPLAIN. The output confirms that only the c_name column is being read from storage.

EXPLAIN SELECT `c_name` FROM `log_table`;

-- Optimized Execution Plan
TableSourceScan(table=[[fluss_catalog, fluss, log_table, project=[c_name]]], fields=[c_name])

Partition Pruning

Partition pruning reduces the number of partitions scanned during a query by filtering based on partition keys in the WHERE clause. This is especially useful for streaming queries on tables with many partitions and supports dynamically pruning newly created partitions.

note

Currently, only equality conditions (e.g., c_nationkey = US) are supported for partition pruning. Operators like <, >, OR, and IN are not yet supported.

Example

First, create a partitioned table:

CREATE TABLE `log_partitioned_table` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` STRING NOT NULL,
`c_phone` STRING NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` STRING NOT NULL,
`c_comment` STRING NOT NULL,
`dt` STRING NOT NULL
) PARTITIONED BY (`c_nationkey`,`dt`);

Next, query the table with a filter on a partition key. The source will only scan partitions where c_nationkey is US.

SELECT * FROM `log_partitioned_table` WHERE `c_nationkey` = 'US';

To verify, you can use EXPLAIN. The output confirms that the filter is being pushed down to the source scan.

EXPLAIN SELECT * FROM `log_partitioned_table` WHERE `c_nationkey` = 'US';

-- Optimized Execution Plan
TableSourceScan(table=[[fluss_catalog, fluss, log_partitioned_table, filter=[=(c_nationkey, 'US')]]], ...)

Ververica Workflow

This section provides a complete workflow for using the Fluss Catalog within a Ververica deployment. The example covers setting up an external Fluss instance, connecting to it from the SQL Editor, and then creating and managing tables and data.

Prerequisites

Before you begin, ensure you have the following:

  • An active Ververica deployment.
  • An AWS account with permissions to create and manage EC2 instances and security groups.
  • The Apache Fluss package downloaded and ready to be configured.

Step 1: Configure and Start Fluss on an EC2 Instance

To connect your deployment to Fluss, you first need a running Fluss cluster that is accessible from the internet. This example uses a simple local deployment on a single AWS EC2 instance.

  1. Modify Configuration Files: To allow external connections, you need to modify the listeners in local-cluster.sh and server.yaml. Replace <DNS_name_ec2> with your EC2 instance's public DNS name.

    • ./bin/local-cluster.sh

      # Start zookeeper
      "$FLUSS_BIN_DIR"/fluss-daemon.sh start zookeeper "${FLUSS_CONF_DIR}"/zookeeper.properties

      # Start single Coordinator Server on this machine
      "$FLUSS_BIN_DIR"/coordinator-server.sh start

      # Start single Tablet Server on this machine.
      # Set bind.listeners as config option to avoid port binding conflict with coordinator server
      "${FLUSS_BIN_DIR}"/tablet-server.sh start \
      -Dbind.listeners=FLUSS://0.0.0.0:9126,EXTERNAL://0.0.0.0:9125 \
      -Dadvertised.listeners=EXTERNAL://<DNS_name_ec2>:9125 \
      -Dinternal.listener.name=FLUSS
    • ./conf/server.yaml

      #================================================
      # Listeners
      #================================================

      # The format is '{listener_name}://{host}:{port}'.
      # Multiple addresses can be specified, separated by commas.
      bind.listeners: FLUSS://0.0.0.0:9123,EXTERNAL://0.0.0.0:9124

      # Address advertised to external clients.
      advertised.listeners: EXTERNAL://<DNS_name_ec2>:9124

      # The name of the listener used for internal communication.
      internal.listener.name: FLUSS
  2. Configure AWS Security Group: In the AWS console, ensure the security group for your EC2 instance allows incoming traffic on the ports you configured (e.g., 9124 and 9125 in this example).

    Warning: Opening ports to 0.0.0.0/0 makes your instance publicly accessible. This setup is for quick testing only. For long-term use, you should implement more restrictive security configurations.

  3. Start the Cluster: Run the script to start your Fluss cluster.

    ./bin/local-cluster.sh start

Step 2: Create the Fluss Catalog and Tables

Now, from the SQL Editor in your deployment, you can connect to the Fluss instance.

  1. Create the Catalog: Enter the following statement in the SQL Editor, replacing <ec2_IP> with your instance's public IP address. To execute it, highlight the code, right-click the selection, and click Run.

    CREATE CATALOG fluss_catalog WITH (
    'type' = 'fluss',
    'bootstrap.servers' = '<ec2_IP>:9124'
    );
  2. Create Tables: Set the catalog and create the tables you will use for the workflow.

    USE CATALOG fluss_catalog;

    CREATE TABLE outputTable (id INT, name STRING, amount INT);
    CREATE TABLE inputTable (id INT, name STRING, amount INT);

    You can monitor the creation of the catalog and tables in the Catalogs panel on the left side of the Console.

Step 3: Insert and Migrate Data

With the catalog and tables in place, you can now run Flink SQL jobs to interact with Fluss.

  1. Insert Initial Data: Deploy the following SQL to populate the inputTable. This is a bounded query, and the job will finish after the values are inserted.

    USE CATALOG fluss_catalog;

    INSERT INTO inputTable VALUES (1,'sam',1), (2,'ric',2);
  2. Migrate Data Between Tables: After the initial data is inserted, deploy the following query to create a streaming job that reads from inputTable and writes to outputTable.

    USE CATALOG fluss_catalog;

    INSERT INTO outputTable SELECT * FROM inputTable;
    note

    By default, this is an unbounded streaming job and will run continuously. To run it as a bounded batch job that finishes after migrating the data, add the following statement before the query: SET 'execution.runtime-mode' = 'batch';

Step 4: Verify the Results

After the jobs have run, you can verify that the data was successfully written to the tables.

The most direct way to check the contents of a table from the SQL Editor is to run a SELECT query in batch mode.

SET 'execution.runtime-mode' = 'batch';

SELECT * FROM outputTable;

Alternatively, you can verify the data programmatically using external tools, such as the Fluss Java APIs. The following method is an example of how you could build a simple Java-based reader to poll a Fluss table for records.

private List<String> readFromFlussTable(
String testOutputTable, int counter, long timeoutSeconds) {
// creating Connection object to connect with Fluss cluster
Configuration conf = new Configuration();
conf.setString("bootstrap.servers", "<DNS_ec2_name>:9124");
try (Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TablePath.of("fluss", testOutputTable));
LogScanner logScanner = table.newScan().createLogScanner()) {
int numBuckets = table.getTableInfo().getNumBuckets();
for (int i = 0; i < numBuckets; i++) {
logScanner.subscribeFromBeginning(i);
}
long scanned = 0;
long startTime = System.currentTimeMillis();
long endTime = startTime;
List<String> rowsList = new ArrayList<>();
while (scanned < counter
&& timeoutSeconds > Duration.ofMillis(endTime - startTime).getSeconds()) {
endTime = System.currentTimeMillis();
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
for (TableBucket bucket : scanRecords.buckets()) {
for (ScanRecord record : scanRecords.records(bucket)) {
InternalRow row = record.getRow();
// Process the row
String rowString =
String.format(
"%s,%s,%s", row.getInt(0), row.getString(1), row.getInt(2));
rowsList.add(rowString);
}
}
scanned += scanRecords.count();
}
return rowsList;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

After verifying that the initial records have been inserted, you can migrate data from inputTable to outputTable with the following query.

USE CATALOG fluss_catalog;

INSERT INTO outputTable SELECT * FROM inputTable;

By default, this INSERT ... SELECT statement creates an unbounded streaming job that runs continuously. To run this as a bounded batch job that finishes after the migration is complete, add the following statement before your query:

SET 'execution.runtime-mode' = 'batch';