Apache Fluss
On this page
New to Fluss? The docs now have a dedicated section — start with the Fluss Overview for architecture, key features, and a quickstart.
Apache Fluss
Apache Fluss is a lightweight, high-performance log storage and streaming system within the Apache Software Foundation. It is a cloud-native alternative to traditional event streaming platforms that aims to simplify data infrastructure while maintaining strong integration with stream processing engines like Apache Flink. Fluss provides durable, ordered, and low-latency log storage to serve as the backbone for real-time analytics and event-driven applications.
Prerequisites
To use the Fluss connector, you need access to a Fluss Cluster, which includes:
- Coordinator server(s)
- Tablet Server(s)
- Zookeeper
Fluss DataStream Connector
The Fluss DataStream Connector integrates Fluss directly with Apache Flink’s DataStream API, enabling you to build low-level, fine-grained streaming applications. The connector provides programmatic access to Fluss topics for producing and consuming events.
The connector is used for DataStream jobs, while the catalog is used for SQL jobs.
Prerequisites The target tables must already exist in Fluss, and you must know their exact names.
Supported Modes
Limitations
Example
This program defines a simple Flink streaming job that uses Fluss for both input and output. First, it collects parameters from the command line, such as the Fluss cluster address, database, and table names. With those in place, it sets up the Flink execution environment.
The job then creates a Fluss source that connects to the given database and table, reads the id, name, and amount fields, and starts consuming data from the earliest available records. It also defines a Fluss sink to write results back into another Fluss table.
Between the source and sink, the job applies a simple transformation: it takes each incoming row and converts the name field to uppercase. After building a new row with the updated name, the job wires these pieces together and launches the execution.
1public class FlussDataStreamJob {
2 public static void main(String[] args) throws Exception {
3 ParameterTool parameterTool = ParameterTool.fromArgs(args);
4 String bootstrapServers = parameterTool.get("bootstrap-servers");
5 String database = parameterTool.get("database");
6 String inputTable = parameterTool.get("input-table");
7 String outputTable = parameterTool.get("output-table");
8
9 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
10 Configuration configuration = new Configuration();
11
12 // Create a FlussSource using the builder pattern
13 FlussSource<RowData> flussSource =
14 FlussSource.<RowData>builder()
15 .setBootstrapServers(bootstrapServers)
16 .setDatabase(database)
17 .setTable(inputTable)
18 .setProjectedFields("id", "name", "amount")
19 .setFlussConfig(configuration)
20 .setStartingOffsets(OffsetsInitializer.earliest())
21 .setScanPartitionDiscoveryIntervalMs(1000L)
22 .setDeserializationSchema(new RowDataDeserializationSchema())
23 .build();
24
25 FlussSink<RowData> flussSink =
26 FlussSink.<RowData>builder()
27 .setBootstrapServers(bootstrapServers)
28 .setDatabase(database)
29 .setTable(outputTable)
30 .setSerializationSchema(new RowDataSerializationSchema(false, true))
31 .build();
32
33 env.fromSource(flussSource, WatermarkStrategy.noWatermarks(), "FlussSource")
34 .map(
35 (MapFunction<RowData, RowData>)
36 rowData -> {
37 // Assuming the second column is a string
38 String original = rowData.getString(1).toString();
39 String uppercased = original.toUpperCase();
40 GenericRowData newRowData = new GenericRowData(3);
41 newRowData.setField(0, rowData.getInt(0));
42 newRowData.setField(1, StringData.fromString(uppercased));
43 newRowData.setField(2, rowData.getInt(2));
44 return newRowData;
45 })
46 .sinkTo(flussSink);
47
48 env.execute("Fluss DataStream Job");
49 }
50}Properties
The properties below can be set via methods on the builder or passed in a Configuration object.
For more information
For a comprehensive list of properties, refer to the official Apache Fluss documentation. The properties on that page use SQL in the examples, but they can be included in the Configuration object from the example above.
(De)Serialization Schemas
You use the FlussSerializationSchema and FlussDeserializationSchema interfaces to convert your data objects to and from Fluss's internal row format. You must provide a deserialization schema when reading from a Fluss source and a serialization schema when writing to a Fluss sink. Fluss provides built-in implementations for RowData and JsonString. You can also create a custom implementation by implementing the appropriate interface.