Fluss
Apache Fluss
Apache Fluss is a lightweight, high-performance log storage and streaming system within the Apache Software Foundation. It is a cloud-native alternative to traditional event streaming platforms that aims to simplify data infrastructure while maintaining strong integration with stream processing engines like Apache Flink. Fluss provides durable, ordered, and low-latency log storage to serve as the backbone for real-time analytics and event-driven applications.
Prerequisites
To use the Fluss connector, you need access to a Fluss Cluster, which includes:
- Coordinator server(s)
- Tablet Server(s)
- Zookeeper
Fluss DataStream Connector
The Fluss DataStream Connector integrates Fluss directly with Apache Flink’s DataStream API, enabling you to build low-level, fine-grained streaming applications. The connector provides programmatic access to Fluss topics for producing and consuming events.
The connector is used for DataStream jobs, while the catalog is used for SQL jobs.
Prerequisites
The target tables must already exist in Fluss, and you must know their exact names.
Supported Modes
| Mode | Supported |
|---|---|
| Source | Yes |
| Sink | Yes |
Limitations
| Limitation | Notes |
|---|---|
| Streaming Mode Only | The Datastream connector does not support bounded jobs. |
Example
This program defines a simple Flink streaming job that uses Fluss for both input and output. First, it collects parameters from the command line, such as the Fluss cluster address, database, and table names. With those in place, it sets up the Flink execution environment.
The job then creates a Fluss source that connects to the given database and table, reads the id, name, and amount fields, and starts consuming data from the earliest available records. It also defines a Fluss sink to write results back into another Fluss table.
Between the source and sink, the job applies a simple transformation: it takes each incoming row and converts the name field to uppercase. After building a new row with the updated name, the job wires these pieces together and launches the execution.
public class FlussDataStreamJob {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String bootstrapServers = parameterTool.get("bootstrap-servers");
String database = parameterTool.get("database");
String inputTable = parameterTool.get("input-table");
String outputTable = parameterTool.get("output-table");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration configuration = new Configuration();
// Create a FlussSource using the builder pattern
FlussSource<RowData> flussSource =
FlussSource.<RowData>builder()
.setBootstrapServers(bootstrapServers)
.setDatabase(database)
.setTable(inputTable)
.setProjectedFields("id", "name", "amount")
.setFlussConfig(configuration)
.setStartingOffsets(OffsetsInitializer.earliest())
.setScanPartitionDiscoveryIntervalMs(1000L)
.setDeserializationSchema(new RowDataDeserializationSchema())
.build();
FlussSink<RowData> flussSink =
FlussSink.<RowData>builder()
.setBootstrapServers(bootstrapServers)
.setDatabase(database)
.setTable(outputTable)
.setSerializationSchema(new RowDataSerializationSchema(false, true))
.build();
env.fromSource(flussSource, WatermarkStrategy.noWatermarks(), "FlussSource")
.map(
(MapFunction<RowData, RowData>)
rowData -> {
// Assuming the second column is a string
String original = rowData.getString(1).toString();
String uppercased = original.toUpperCase();
GenericRowData newRowData = new GenericRowData(3);
newRowData.setField(0, rowData.getInt(0));
newRowData.setField(1, StringData.fromString(uppercased));
newRowData.setField(2, rowData.getInt(2));
return newRowData;
})
.sinkTo(flussSink);
env.execute("Fluss DataStream Job");
}
}
Properties
The properties below can be set via methods on the builder or passed in a Configuration object.
| Property | Method (if present) | Injection | Scope | Required | Description |
|---|---|---|---|---|---|
bootstrap.servers | setBootstrapServers | Java method or Configuration object | Both | Yes | Provide the address of a coordinator or tablet server. |
database | setDatabase | Method only | Both | Yes | The database where the table is located. |
table | setTable | Method only | Both | Yes | The table you want to read from or write to. |
client.id | n/a | Configuration only | Both | No | An optional client ID to trace requests. |
scan.startup.mode | setStartingOffsets | Method or Configuration | Source | No | Specifies the starting point for data consumption. |
For more information
For a comprehensive list of properties, refer to the official Apache Fluss documentation. The properties on that page use SQL in the examples, but they can be included in the Configuration object from the example above.
(De)Serialization Schemas
You use the FlussSerializationSchema and FlussDeserializationSchema interfaces to convert your data objects to and from Fluss's internal row format. You must provide a deserialization schema when reading from a Fluss source and a serialization schema when writing to a Fluss sink.
Fluss provides built-in implementations for RowData and JsonString. You can also create a custom implementation by implementing the appropriate interface.