Skip to main content

Fluss (Private Preview)

Private Preview Feature

This feature is currently in Private Preview and is available by invitation only. It is not ready for general commercial release and might contain bugs, errors, or defects. It is provided solely for testing, research, and evaluation purposes.

For more information or to request access, please visit the Introducing Apache Fluss on Ververica’s Unified Streaming Data Platform blog post.

Prerequisites and Limitations

During the Private Preview phase, be aware of the following configurations and limitations:

  • Non-Production Use: This environment is not meant for production workloads or mission-critical data.
  • Dedicated Workspace: We strongly advise creating a dedicated workspace for your Fluss preview to avoid impacting the stability of your existing projects.
  • Storage Limit: The preview cluster provides a limited overall disk size (e.g., 300GB).
  • Replication: The cluster typically sets a replication factor of 3 for high availability. Note that tables will consume disk space proportional to bucket_number * 6GB.
  • Security:
    • No ACLs: The provided credentials grant superuser access. Fine-grained Access Control Lists (ACLs) are not enabled.
    • No TLS: External connections to the Fluss cluster are not protected via TLS. Do not transmit sensitive data over external connections.
  • Scanning Limitations: When connecting externally, you can only scan from the latest offset. Reading from the earliest offset or performing a full snapshot scan is not supported externally.

Connection Details

After your workspace is enabled for the preview, you will receive specific connection details. You will need these to configure your Flink jobs and external applications.

ParameterDescription
Internal Fluss URIUsed for VERA jobs running inside Ververica Cloud. Use this for maximum network speed and reduced latency.
External Fluss URIUsed to connect to the Fluss cluster from applications outside Ververica Cloud (e.g., local development).
Username / PasswordAuthentication credentials required for both internal and external connections.

Quickstart

This guide demonstrates how to create a Fluss catalog, generate test data, and query a primary key table.

1. Create a Fluss Catalog

Run the following SQL in your Ververica Cloud SQL Editor to register the catalog using the Internal URI:

CREATE CATALOG fluss WITH (
'type' = 'fluss',
'bootstrap.servers' = '<YOUR_INTERNAL_FLUSS_URI>', -- e.g., coordinator-server-0.fluss-vvc...:9127
'client.security.protocol' = 'SASL',
'client.security.sasl.mechanism' = 'PLAIN',
'client.security.sasl.username' = '<YOUR_FLUSS_USERNAME>',
'client.security.sasl.password' = '<YOUR_FLUSS_PASSWORD>'
);

2. Create a Database and Table

Create a database and a partitioned table with a primary key:

CREATE DATABASE fluss.quickstart;

CREATE TABLE IF NOT EXISTS fluss.quickstart.pk_table (
c1 BIGINT,
c2 INT,
c3 STRING,
c4 BIGINT,
c5 INT,
c6 STRING,
PRIMARY KEY (c1, c2, c3) NOT ENFORCED
) PARTITIONED BY (c3) WITH (
'bucket.num' = '2'
);

3. Generate Data

Deploy a temporary data generation job using the faker connector to populate the table:

CREATE TEMPORARY TABLE fake WITH (
'connector' = 'faker',
'rows-per-second' = '1000',
'fields.c1.expression' = '#{number.numberBetween ''0'',''1000000''}',
'fields.c2.expression' = '#{number.numberBetween ''0'',''1000''}',
-- Generates a partition key based on a date pattern
'fields.c3.expression' = '202601#{regexify ''(01|02|03|04|05|06|07)''}',
'fields.c4.expression' = '#{number.numberBetween ''0'',''1000000''}',
'fields.c5.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.c6.expression' = '#{superhero.name}'
) LIKE fluss.quickstart.pk_table (EXCLUDING ALL);

INSERT INTO fluss.quickstart.pk_table
SELECT * FROM fake;

4. Query the Data

After the job is running, you can query the table directly:

SELECT * FROM fluss.quickstart.pk_table;

For more advanced examples, refer to the official Fluss documentation.

External Connectivity

You can connect to Fluss from external applications (such as a local Java application) using the External URI.

note

Remember to strictly use the latest offset when scanning from an external connection.

SQL Example (External)

SELECT * FROM fluss.quickstart.pk_table 
/*+ OPTIONS('scan.startup.mode'='latest') */;

Java SDK Example

The following Java snippet demonstrates how to configure the client and scan for records using the LogScanner.

import java.time.Duration;
import java.util.Collections;

import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.admin.ListOffsetsResult;
import org.apache.fluss.client.admin.OffsetSpec;
import org.apache.fluss.client.table.Table;
import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.client.table.scanner.log.ScanRecords;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;

// 1. Configure Connection
Configuration conf = new Configuration();
conf.setString("bootstrap.servers", "<YOUR_EXTERNAL_FLUSS_URI>"); // e.g. ...ververica.cloud:9128
conf.setString("client.security.protocol", "SASL");
conf.setString("client.security.sasl.mechanism", "PLAIN");
conf.setString("client.security.sasl.username", "<YOUR_FLUSS_USERNAME>");
conf.setString("client.security.sasl.password", "<YOUR_FLUSS_PASSWORD>");

try (Connection conn = ConnectionFactory.createConnection(conf)) {
// 2. Access the Table
TablePath tablePath = TablePath.of("quickstart", "pk_table");
Table table = conn.getTable(tablePath);
Admin admin = conn.getAdmin();
ListOffsetsResult listOffsetsResult =
admin.listOffsets( tablePath, Collections.singleton(0), new
OffsetSpec.LatestSpec());
Long latest = listOffsetsResult.bucketResult(0).get();

// 3. Scan from Latest Offset
try (LogScanner logScanner = table.newScan()
.createLogScanner()) {

logScanner.subscribe(0, latest); // Subscribe to bucket 0 at the latest offset

while (true) {
System.out.println("Polling for records...");
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));

for (TableBucket bucket : scanRecords.buckets()) {
for (ScanRecord record : scanRecords.records(bucket)) {
InternalRow row = record.getRow();
// Process your row here
System.out.println("Read row: " + row);
}
}
}
}
}