Skip to main content

Fluss

Apache Fluss

Apache Fluss is a lightweight, high-performance log storage and streaming system within the Apache Software Foundation. It is a cloud-native alternative to traditional event streaming platforms that aims to simplify data infrastructure while maintaining strong integration with stream processing engines like Apache Flink. Fluss provides durable, ordered, and low-latency log storage to serve as the backbone for real-time analytics and event-driven applications.

Prerequisites

To use the Fluss connector, you need access to a Fluss Cluster, which includes:

  • Coordinator server(s)
  • Tablet Server(s)
  • Zookeeper

Fluss DataStream Connector

The Fluss DataStream Connector integrates Fluss directly with Apache Flink’s DataStream API, enabling you to build low-level, fine-grained streaming applications. The connector provides programmatic access to Fluss topics for producing and consuming events.

note

The connector is used for DataStream jobs, while the catalog is used for SQL jobs.

Prerequisites

The target tables must already exist in Fluss, and you must know their exact names.

Supported Modes

ModeSupported
SourceYes
SinkYes

Limitations

LimitationNotes
Streaming Mode OnlyThe Datastream connector does not support bounded jobs.

Example

This program defines a simple Flink streaming job that uses Fluss for both input and output. First, it collects parameters from the command line, such as the Fluss cluster address, database, and table names. With those in place, it sets up the Flink execution environment.

The job then creates a Fluss source that connects to the given database and table, reads the id, name, and amount fields, and starts consuming data from the earliest available records. It also defines a Fluss sink to write results back into another Fluss table.

Between the source and sink, the job applies a simple transformation: it takes each incoming row and converts the name field to uppercase. After building a new row with the updated name, the job wires these pieces together and launches the execution.

public class FlussDataStreamJob {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String bootstrapServers = parameterTool.get("bootstrap-servers");
String database = parameterTool.get("database");
String inputTable = parameterTool.get("input-table");
String outputTable = parameterTool.get("output-table");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration configuration = new Configuration();

// Create a FlussSource using the builder pattern
FlussSource<RowData> flussSource =
FlussSource.<RowData>builder()
.setBootstrapServers(bootstrapServers)
.setDatabase(database)
.setTable(inputTable)
.setProjectedFields("id", "name", "amount")
.setFlussConfig(configuration)
.setStartingOffsets(OffsetsInitializer.earliest())
.setScanPartitionDiscoveryIntervalMs(1000L)
.setDeserializationSchema(new RowDataDeserializationSchema())
.build();

FlussSink<RowData> flussSink =
FlussSink.<RowData>builder()
.setBootstrapServers(bootstrapServers)
.setDatabase(database)
.setTable(outputTable)
.setSerializationSchema(new RowDataSerializationSchema(false, true))
.build();

env.fromSource(flussSource, WatermarkStrategy.noWatermarks(), "FlussSource")
.map(
(MapFunction<RowData, RowData>)
rowData -> {
// Assuming the second column is a string
String original = rowData.getString(1).toString();
String uppercased = original.toUpperCase();
GenericRowData newRowData = new GenericRowData(3);
newRowData.setField(0, rowData.getInt(0));
newRowData.setField(1, StringData.fromString(uppercased));
newRowData.setField(2, rowData.getInt(2));
return newRowData;
})
.sinkTo(flussSink);

env.execute("Fluss DataStream Job");
}
}

Properties

The properties below can be set via methods on the builder or passed in a Configuration object.

PropertyMethod (if present)InjectionScopeRequiredDescription
bootstrap.serverssetBootstrapServersJava method or Configuration objectBothYesProvide the address of a coordinator or tablet server.
databasesetDatabaseMethod onlyBothYesThe database where the table is located.
tablesetTableMethod onlyBothYesThe table you want to read from or write to.
client.idn/aConfiguration onlyBothNoAn optional client ID to trace requests.
scan.startup.modesetStartingOffsetsMethod or ConfigurationSourceNoSpecifies the starting point for data consumption.

For more information

For a comprehensive list of properties, refer to the official Apache Fluss documentation. The properties on that page use SQL in the examples, but they can be included in the Configuration object from the example above.

(De)Serialization Schemas

You use the FlussSerializationSchema and FlussDeserializationSchema interfaces to convert your data objects to and from Fluss's internal row format. You must provide a deserialization schema when reading from a Fluss source and a serialization schema when writing to a Fluss sink. Fluss provides built-in implementations for RowData and JsonString. You can also create a custom implementation by implementing the appropriate interface.