Fluss (Private Preview)
On this page
This feature is currently in Private Preview and is available by invitation only. It is not ready for general commercial release and might contain bugs, errors, or defects. It is provided solely for testing, research, and evaluation purposes.
For more information or to request access, please visit the Introducing Apache Fluss on Ververica’s Unified Streaming Data Platform blog post.
Prerequisites and Limitations
During the Private Preview phase, be aware of the following configurations and limitations:
- Non-Production Use: This environment is not meant for production workloads or mission-critical data.
- Dedicated Workspace: We strongly advise creating a dedicated workspace for your Fluss preview to avoid impacting the stability of your existing projects.
- Storage Limit: The preview cluster provides a limited overall disk size (e.g., 300GB).
- Replication: The cluster typically sets a replication factor of 3 for high availability. Note that tables will consume disk space proportional to
bucket_number * 6GB. - Security:
- No ACLs: The provided credentials grant superuser access. Fine-grained Access Control Lists (ACLs) are not enabled.
- No TLS: External connections to the Fluss cluster are not protected via TLS. Do not transmit sensitive data over external connections.
- Scanning Limitations: When connecting externally, you can only scan from the latest offset. Reading from the earliest offset or performing a full snapshot scan is not supported externally.
Connection Details
After your workspace is enabled for the preview, you will receive specific connection details. You will need these to configure your Flink jobs and external applications.
Quickstart
This guide demonstrates how to create a Fluss catalog, generate test data, and query a primary key table.
1. Create a Fluss Catalog
Run the following SQL in your Ververica Cloud SQL Editor to register the catalog using the Internal URI:
1CREATE CATALOG fluss WITH (
2 'type' = 'fluss',
3 'bootstrap.servers' = '<YOUR_INTERNAL_FLUSS_URI>', -- e.g., coordinator-server-0.fluss-vvc...:9127
4 'client.security.protocol' = 'SASL',
5 'client.security.sasl.mechanism' = 'PLAIN',
6 'client.security.sasl.username' = '<YOUR_FLUSS_USERNAME>',
7 'client.security.sasl.password' = '<YOUR_FLUSS_PASSWORD>'
8);2. Create a Database and Table
Create a database and a partitioned table with a primary key:
1CREATE DATABASE fluss.quickstart;
2
3CREATE TABLE IF NOT EXISTS fluss.quickstart.pk_table (
4 c1 BIGINT,
5 c2 INT,
6 c3 STRING,
7 c4 BIGINT,
8 c5 INT,
9 c6 STRING,
10 PRIMARY KEY (c1, c2, c3) NOT ENFORCED
11) PARTITIONED BY (c3) WITH (
12 'bucket.num' = '2'
13);3. Generate Data
Deploy a temporary data generation job using the faker connector to populate the table:
1CREATE TEMPORARY TABLE fake WITH (
2 'connector' = 'faker',
3 'rows-per-second' = '1000',
4 'fields.c1.expression' = '#{number.numberBetween ''0'',''1000000''}',
5 'fields.c2.expression' = '#{number.numberBetween ''0'',''1000''}',
6 -- Generates a partition key based on a date pattern
7 'fields.c3.expression' = '202601#{regexify ''(01|02|03|04|05|06|07)''}',
8 'fields.c4.expression' = '#{number.numberBetween ''0'',''1000000''}',
9 'fields.c5.expression' = '#{number.numberBetween ''0'',''1000''}',
10 'fields.c6.expression' = '#{superhero.name}'
11) LIKE fluss.quickstart.pk_table (EXCLUDING ALL);
12
13INSERT INTO fluss.quickstart.pk_table
14SELECT * FROM fake;4. Query the Data
After the job is running, you can query the table directly:
1SELECT * FROM fluss.quickstart.pk_table;For more advanced examples, refer to the official Fluss documentation.
External Connectivity
You can connect to Fluss from external applications (such as a local Java application) using the External URI.
Remember to strictly use the latest offset when scanning from an external connection.
SQL Example (External)
1SELECT * FROM fluss.quickstart.pk_table
2/*+ OPTIONS('scan.startup.mode'='latest') */;Java SDK Example
The following Java snippet demonstrates how to configure the client and scan for records using the LogScanner.
1import java.time.Duration;
2import java.util.Collections;
3
4import org.apache.fluss.client.Connection;
5import org.apache.fluss.client.ConnectionFactory;
6import org.apache.fluss.config.Configuration;
7import org.apache.fluss.client.admin.Admin;
8import org.apache.fluss.client.admin.ListOffsetsResult;
9import org.apache.fluss.client.admin.OffsetSpec;
10import org.apache.fluss.client.table.Table;
11import org.apache.fluss.client.table.scanner.ScanRecord;
12import org.apache.fluss.client.table.scanner.log.LogScanner;
13import org.apache.fluss.client.table.scanner.log.ScanRecords;
14import org.apache.fluss.metadata.TableBucket;
15import org.apache.fluss.metadata.TablePath;
16import org.apache.fluss.row.InternalRow;
17
18// 1. Configure Connection
19Configuration conf = new Configuration();
20conf.setString("bootstrap.servers", "<YOUR_EXTERNAL_FLUSS_URI>"); // e.g. ...ververica.cloud:9128
21conf.setString("client.security.protocol", "SASL");
22conf.setString("client.security.sasl.mechanism", "PLAIN");
23conf.setString("client.security.sasl.username", "<YOUR_FLUSS_USERNAME>");
24conf.setString("client.security.sasl.password", "<YOUR_FLUSS_PASSWORD>");
25
26try (Connection conn = ConnectionFactory.createConnection(conf)) {
27 // 2. Access the Table
28 TablePath tablePath = TablePath.of("quickstart", "pk_table");
29 Table table = conn.getTable(tablePath);
30 Admin admin = conn.getAdmin();
31 ListOffsetsResult listOffsetsResult =
32 admin.listOffsets( tablePath, Collections.singleton(0), new
33OffsetSpec.LatestSpec());
34 Long latest = listOffsetsResult.bucketResult(0).get();
35
36 // 3. Scan from Latest Offset
37 try (LogScanner logScanner = table.newScan()
38 .createLogScanner()) {
39
40 logScanner.subscribe(0, latest); // Subscribe to bucket 0 at the latest offset
41
42 while (true) {
43 System.out.println("Polling for records...");
44 ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
45
46 for (TableBucket bucket : scanRecords.buckets()) {
47 for (ScanRecord record : scanRecords.records(bucket)) {
48 InternalRow row = record.getRow();
49 // Process your row here
50 System.out.println("Read row: " + row);
51 }
52 }
53 }
54 }
55}Related Topics
Continue with Fluss:
- Fluss Overview — how Fluss fits into Ververica's ecosystem.
- Apache Fluss connector — read from and write to Fluss in Flink SQL.
- Manage Fluss catalog — register Fluss databases and tables.