Docs Home
Viewing docs for
Self-ManagedNot available for BYOC

Running Lakehouse (Iceberg) Jobs against Fluss

On this page

This document provides configuration and deployment instructions for engineers running lakehouse (Iceberg) Flink jobs against Fluss, specifically focusing on the tiering service and union reads on lake-tiered tables. It applies to Apache Flink workflows using the Fluss Flink connector across all submission runners, including the Flink CLI, the Apache Flink Kubernetes Operator, standalone clusters, and Ververica Platform 3.

Overview

You can submit an Apache Flink job to Fluss using several co-equal runners. None of these runners are privileged over the others:

  • The Flink CLI or SQL client
  • The Apache Flink Kubernetes Operator (FlinkDeployment)
  • A session or application cluster
  • Ververica Platform

For more information on using Ververica Platform, see Running Jobs on Ververica Platform.

Every runner requires a configured client connection. This configuration is shared across all clients and is documented in Reading and Writing Fluss.docx. A job that only reads or writes fresh data resident in Fluss does not require any additional configuration.

This guide covers the additional setup that lake-touching jobs require, and explains how each runner submits those jobs.

Lake-Access Job

Two types of Flink jobs require Iceberg lake plugins on their classpath, alongside the Fluss client or connector and the Fluss filesystem plugin.

The Tiering Service

The tiering service is a long-running Flink streaming job that copies data from Fluss into an Iceberg warehouse (the historical layer). For the server-side configuration that makes a Fluss table tier-readable, see Configuring Lake Tiering.

Lake-Reading Jobs

Lake-reading jobs include any Flink SQL or DataStream job that reads the lake side of a tiered table. This includes the following scenarios:

  • A union read of a table.datalake.enabled table, which combines fresh rows from Fluss with historical rows from Iceberg.
  • A direct read of the $lake or $lake$snapshots system tables of a table.

Both job types share the same Flink classpath plugin set and the same per-flavor Hadoop core-site.xml file. The jobs differ only in their job binaries and in the arguments or SQL that you submit.

Why Lake-Touching Jobs Require Extra Client-Side Configuration

A Flink job that reads only fresh data from Fluss does not require any of the configuration described in this manual. This applies to non-table.datalake.enabled tables or any read operation that stays entirely within the Fluss layer. For these jobs, the Fluss server performs object-storage I/O on behalf of the client. When the client must read directly, the server vends short-lived credentials through its delegation-token mechanism.

Lake-touching jobs function differently because they access historical data directly from the underlying object store. This includes Iceberg metadata and data files through Hadoop's HadoopFileIO, as well as tiered Fluss segments through the Fluss filesystem plugin.

These jobs require extra client-side configuration due to the following limitations:

  • HadoopFileIO does not participate in the delegation-token mechanism of Fluss.
  • On Security Token Service-less (STS-less) S3-compatible stores (such as NooBaa, MinIO, Ceph RGW, or ODF), the server cannot vend tokens at all.

Because of these limitations, the job must carry its own filesystem credentials.

How to Supply Credentials

The method you use to supply these credentials depends on the specific runner you use:

  • Flink-Native Runners: Use a Hadoop core-site.xml file.
  • Ververica Platform: Use the platform's IAM principal.
  • Java SDK: Use client.fs.s3.* properties in the Fluss Configuration.

For detailed instructions, see Lake-Job Credentials.

Enabling Tiering on Fluss Tables

Tiering is an opt-in feature that you configure for each individual Fluss table. To enable tiering, you must set the table.datalake.enabled property to true when you execute your CREATE TABLE statement.

Tables that you create without this option remain Fluss-only tables.

SQL
1CREATE TABLE <DB>.<TABLE> (
2    <COLUMNS>,
3    PRIMARY KEY (<PK>) NOT ENFORCED
4) WITH (
5    'table.datalake.enabled'   = 'true',
6    'table.datalake.freshness' = '<FRESHNESS>'  -- e.g. '30s', '3min'
7);

Configuring Table Datalake Options

The table.datalake.freshness property controls how far the Iceberg lake copy can lag behind the live Fluss data.

  • Lower freshness values reduce data latency but increase operational costs.
  • Longer freshness values increase data latency but reduce operational costs.

The default value is 3 minutes.

Running the DDL

You execute the CREATE TABLE statement from a Flink SQL client that is connected to the Fluss catalog. For more information on setting up this connection, see Reading and Writing Fluss.docx, and then navigate to Configuring the Flink Catalog.

For a complete reference of all table.datalake.* options and details about the corresponding Iceberg table layout, see the Fluss Iceberg integration documentation.

Reading Tiered Tables

Overview

Once the tiering service is running, you can query a table.datalake.enabled Fluss table using two different methods:

  • Through the Fluss Connector (Flink): This method performs a union read that combines Fluss-resident fresh data with Iceberg-resident historical data. This is the default behavior when you query a table.datalake.enabled Fluss table from a Fluss catalog.
  • Directly Through an Iceberg-Compatible Engine: You can query the data using engines such as Flink, Spark, Trino, or StarRocks by attaching directly to the same Iceberg catalog and warehouse that Ververica Platform tiers into.

Configuration

Union-read jobs query data through the Fluss catalog and the Fluss connector. To set up this connection, configure the catalog as shown in Reading and Writing Fluss, and then navigate to Configuring the Flink Catalog. This is the only Fluss-specific configuration that a read job requires.

Beyond the catalog configuration, the job requires the following components:

A union-read job executes two distinct object-store code paths inside the same TaskManager JVM:

  • The Iceberg reader, which communicates through Hadoop's HadoopFileIO.
  • The Fluss filesystem plugin (such as fs-s3 or fs-azure), which reads tiered Fluss segments.

You can provide the necessary credentials for both code paths by using a single, mounted core-site.xml file. For detailed setup instructions, see Lake Job Credentials.docx.

For information regarding SQL syntax and upstream union-read semantics, see the Fluss Iceberg integration documentation.

Managing the Tiering Service

The tiering service is a long-running Flink streaming job that copies data from Fluss into the Iceberg warehouse. It acts as the exclusive writer for the lake side of a tiered Fluss table. You can query the resulting data using union reads or through any Iceberg-compatible engine.

Obtaining the Job Binary

The tiering job ships within the lake-tiering-service client setup. You can install it by running the install command with the --output-dir option pointing to a directory that you control.

This installation process generates the fluss-flink-tiering-0.9.1-vv-2.jar file. You must reference this file as a job JAR rather than a classpath plugin. For the specific installation commands, see Installing Fluss, and then navigate to Setups with a Client Target.

Submission

The tiering job binary supports every catalog and storage combination. The only differences between environmental setups are the warehouse URL and a small number of CLI arguments. You can submit the job using the same methods you use for any other Flink job.

Deploy a FlinkDeployment custom resource. Use the path to your fluss-flink-tiering-0.9.1-vv-2.jar file for the jarURI property, and provide your environment-specific CLI arguments in the args property.

Execute the following command from your terminal:

TEXT
1$FLINK_HOME/bin/flink run -d /path/to/fluss-flink-tiering-0.9.1-vv-2.jar <CLI_ARGS>

Application or Session Cluster

Submit the JAR file through the standard mechanism provided by your runner.

Ververica Platform

You can submit the tiering job to Ververica Platform using either of the following methods:

  • Create a JAR Deployment directly from the web console.
  • Deploy a VvpDeployment custom resource through the Ververica Platform Kubernetes operator.

For more information, see Running Jobs on Ververica Platform.

In every deployment scenario, the JobManager and TaskManagers require specific components to execute the tiering job successfully. You must ensure that both of the following elements are present on your cluster:

  • The Flink-Classpath Plugins: For detailed setup instructions, see Flink-Classpath Plugins.
  • Filesystem Credentials: If you use a Flink-native runner, you must provide a Hadoop core-site.xml file. For configuration details, see Lake Job Credentials.docx. If you deploy on Ververica Platform, see Lake Job Credentials.docxs and navigate to the section covering Ververica Platform instead.

The CLi arguments per flavor:

Tiering on Amazon S3 (Hadoop catalog)

TEXT
1--fluss.bootstrap.servers <COORDINATOR_BOOTSTRAP>
2--datalake.format iceberg
3--datalake.iceberg.type hadoop
4--datalake.iceberg.warehouse s3a://<LAKE_BUCKET>/<WAREHOUSE_PREFIX>
5# When the source Fluss cluster has SASL enabled, also pass:
6--fluss.client.security.protocol SASL
7--fluss.client.security.sasl.mechanism PLAIN
8--fluss.client.security.sasl.username <FLUSS_USERNAME>
9--fluss.client.security.sasl.password <FLUSS_PASSWORD>

Tiering on Azure Blob Storage (Hadoop catalog)

TEXT
1--fluss.bootstrap.servers <COORDINATOR_BOOTSTRAP>
2--datalake.format iceberg
3--datalake.iceberg.type hadoop
4--datalake.iceberg.warehouse abfs://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<WAREHOUSE_PATH>
5# When the source Fluss cluster has SASL enabled, also pass:
6--fluss.client.security.protocol SASL
7--fluss.client.security.sasl.mechanism PLAIN
8--fluss.client.security.sasl.username <FLUSS_USERNAME>
9--fluss.client.security.sasl.password <FLUSS_PASSWORD>

Tiering on NooBaa or another S3-compatible store (Hadoop catalog)

TEXT
1--fluss.bootstrap.servers <COORDINATOR_BOOTSTRAP>
2--datalake.format iceberg
3--datalake.iceberg.type hadoop
4--datalake.iceberg.warehouse s3a://<LAKE_BUCKET>/<WAREHOUSE_PREFIX>
5# When the source Fluss cluster has SASL enabled, also pass:
6--fluss.client.security.protocol SASL
7--fluss.client.security.sasl.mechanism PLAIN
8--fluss.client.security.sasl.username <FLUSS_USERNAME>
9--fluss.client.security.sasl.password <FLUSS_PASSWORD>

Tiering on Amazon S3 (REST catalog)

TEXT
1--fluss.bootstrap.servers <COORDINATOR_BOOTSTRAP>
2--datalake.format iceberg
3--datalake.iceberg.type rest
4--datalake.iceberg.uri <REST_CATALOG_URI>
5--datalake.iceberg.warehouse <WAREHOUSE_NAME>
6--datalake.iceberg.io-impl org.apache.iceberg.hadoop.HadoopFileIO
7--datalake.iceberg.credential <CLIENT_ID>:<CLIENT_SECRET>
8--datalake.iceberg.scope <OAUTH2_SCOPE>
9# When the source Fluss cluster has SASL enabled, also pass:
10--fluss.client.security.protocol SASL
11--fluss.client.security.sasl.mechanism PLAIN
12--fluss.client.security.sasl.username <FLUSS_USERNAME>
13--fluss.client.security.sasl.password <FLUSS_PASSWORD>

Both job kinds need the following on /opt/flink/lib/:

PluginPurposeSource
Lake plugin: lake-iceberg-s3 (S3 / S3-compatible / REST-on-S3) or lake-iceberg-abs (ADLS Gen2 with Hadoop catalog)Iceberg reader/writer for the chosen storage backendPlugin installer (install.sh client lake-iceberg-s3 / install.sh client lake-iceberg-abs) — see Installing Fluss Setups
Fluss filesystem plugin: fs-s3 (S3, NooBaa, REST/S3) or fs-azure (ABS)Reads tiered Fluss segments back from object storagePlugin installer (install.sh client fs-s3 / install.sh client fs-azure) — see Installing Fluss Setups

Managing Connector and Dependencies Files

The Fluss Flink connector does not require a separate installation step for either job type. The connector is already bundled within the fs-* and lake-iceberg-* setups.

Additional Tiering Service Requirements

The tiering service requires the lake-tiering-service job binary. Because this file functions as an application JAR rather than a classpath plugin, you do not place it in the core library directory. For more information, see Configuring Lake Tiering.docx.

Deploying on Ververica Platform

When you deploy jobs on Ververica Platform, you must use the same set of plugins, but you do not place them in the /opt/flink/lib/ directory. Instead, use the following workflow to manage your dependencies:

  1. Run the install.sh client <setup> --output-dir <DIR> command to generate the required JAR files.
  2. Upload the resulting JAR files to the Artifacts section of your workspace.
  3. Reference the uploaded files as Additional Dependencies in your deployment configuration.

For a detailed walkthrough of this process, see Reading and Writing Fluss, and then navigate to Running Jobs on Ververica Platform and locate the Dependencies section.

Lake-Job Credentials

The method you use to supply object-store credentials depends entirely on whether you deploy your job using a Flink-native runner or through Ververica Platform.

When you use Flink-native runners, such as the Apache Flink Kubernetes Operator, the Flink CLI, or a session or application cluster, a lake-touching job credentials its object-store access through a Hadoop core-site.xml file. You must place this file on the JobManager and TaskManager classpath.

A single core-site.xml file covers the credentials for both of the following components:

  • The Iceberg reader (HadoopFileIO)
  • The Fluss filesystem plugin

For details regarding the specific file content required for each storage backend and instructions on how to ship the file to your cluster, see Lake Job Credentials.docx.

This example combines the necessary building blocks into a single deployable manifest. In this scenario, the tiering service runs against a Fluss cluster in the fluss namespace. The deployment uses an Iceberg REST catalog to handle metadata commits and Amazon S3 as the warehouse storage. The Apache Flink Kubernetes Operator acts as the runner.

The deployment pattern consists of a core-site.xml ConfigMap, a Fluss-setup init container, and a FlinkDeployment custom resource that references both components. You can generalize this pattern to any Flink JAR job that accesses Fluss, the data lake, or both. Whether you deploy union-read jobs, custom DataStream jobs, or ad-hoc readers, you only need to change the job.jarURI property and the args block. Keep this versatility in mind when you adapt this example for other workflows.

Prerequisites

Before you deploy the tiering service, you must ensure that your environment meets the following requirements:

  • Fluss Cluster: A running Fluss cluster must exist in the fluss namespace with remote storage configured on Amazon S3. For setup details, see Configuring Remote Storage, and then navigate to Amazon S3.
  • Iceberg REST Catalog: A running Iceberg REST catalog must be reachable from your Flink namespace. The catalog must have a registered warehouse and an OAuth2 client credential and scope.
  • Flink Operator: The Apache Flink Kubernetes Operator must be installed and active in your Kubernetes cluster.
  • Image Pull Secret: You must configure an image pull secret named ververica-registry for registry.ververica.cloud/platform-images/fluss:0.9.1-vv-2. The init container uses this secret to fetch Fluss client setups. For instructions, see Deploying Fluss on Kubernetes, and then navigate to Create the Image Pull Secret.
  • Kubernetes Secret: A Kubernetes Secret named fluss-tiering-secrets must exist and hold the ICEBERG_REST_CREDENTIAL key in the <CLIENT_ID>:<CLIENT_SECRET> format. Amazon S3 credentials remain inside the core-site.xml ConfigMap, so you do not need to place them in the pod environment.

Creating the Hadoop Configuration ConfigMap

You must render your core-site.xml file into a Kubernetes ConfigMap. Iceberg's HadoopFileIO component reads its Amazon S3 credentials directly from this file when it is mounted on the JobManager and TaskManager classpath.

The following example shows the minimal file configuration required for this storage flavor:

XML
1<configuration>
2  <property><name>fs.s3a.access.key</name><value><AWS_ACCESS_KEY_ID></value></property>
3  <property><name>fs.s3a.secret.key</name><value><AWS_SECRET_ACCESS_KEY></value></property>
4  <property><name>fs.s3a.region</name><value><AWS_REGION></value></property>
5</configuration>

For deployments that require IAM Roles for Service Accounts (IRSA) auth instead of static keys, or if you need to apply the WebIdentityTokenCredentialsProvider override required on Flink 1.20, see Lake Job Credentials.docx, and then navigate to the section covering On Amazon S3 (Hadoop catalog).

Once you determine the correct configuration for your environment, save the definition locally and execute the following command to create the ConfigMap in your cluster:

BASH
1kubectl -n fluss create configmap fluss-tiering-core-site \
2  --from-file=core-site.xml=/path/to/core-site.xml

You can now deploy your FlinkDeployment custom resource to your cluster. Before you execute the deployment, ensure that you substitute all placeholder values to match your specific environmental configuration.

The following manifest defines the complete deployment specification:

YAML
1apiVersion: flink.apache.org/v1beta1
2kind: FlinkDeployment
3metadata:
4  name: fluss-tiering
5  namespace: fluss
6spec:
7  image: flink:1.20
8  flinkVersion: v1_20
9  imagePullSecrets:
10    - name: ververica-registry
11  flinkConfiguration:
12    # Mount the core-site.xml ConfigMap onto the JM/TM classpath.
13    kubernetes.hadoop.conf.config-map.name: fluss-tiering-core-site
14  jobManager:
15    resource: { memory: 2048m, cpu: 1 }
16  taskManager:
17    resource: { memory: 4096m, cpu: 2 }
18  podTemplate:
19    spec:
20      imagePullSecrets:
21        - name: ververica-registry
22      initContainers:
23        - name: install-fluss-setups
24          image: registry.ververica.cloud/platform-images/fluss:0.9.1-vv-2
25          env:
26            - name: FLINK_HOME
27              value: /opt/flink
28          command:
29            - /bin/sh
30            - -c
31            - |
32              set -e
33              # Install three client setups under $FLINK_HOME/lib/fluss-<setup>/:
34              #   lake-iceberg-s3      — Iceberg reader/writer + Hadoop S3A
35              #   fs-s3                — Fluss filesystem plugin for tiered segments
36              #   lake-tiering-service — the tiering job JAR
37              /opt/fluss/bin/setup/install.sh client lake-iceberg-s3 --flink-home /opt/flink --force -- -q
38              /opt/fluss/bin/setup/install.sh client fs-s3            --flink-home /opt/flink --force -- -q
39              /opt/fluss/bin/setup/install.sh client lake-tiering-service --flink-home /opt/flink --force -- -q
40          volumeMounts:
41            - name: flink-lib
42              mountPath: /opt/flink/lib
43      containers:
44        - name: flink-main-container
45          envFrom:
46            - secretRef:
47                name: fluss-tiering-secrets   # ICEBERG_REST_CREDENTIAL
48          volumeMounts:
49            - name: flink-lib
50              mountPath: /opt/flink/lib
51      volumes:
52        - name: flink-lib
53          emptyDir: {}
54  job:
55    jarURI: local:///opt/flink/lib/fluss-lake-tiering-service/fluss-flink-tiering-0.9.1-vv-2.jar
56    parallelism: 2
57    upgradeMode: stateless
58    args:
59      - --fluss.bootstrap.servers
60      - coordinator-server-0.coordinator-server-hs.fluss.svc.cluster.local:9124
61      - --datalake.format
62      - iceberg
63      - --datalake.iceberg.type
64      - rest
65      - --datalake.iceberg.uri
66      - <REST_CATALOG_URI>
67      - --datalake.iceberg.warehouse
68      - <WAREHOUSE_NAME>
69      - --datalake.iceberg.io-impl
70      - org.apache.iceberg.hadoop.HadoopFileIO
71      - --datalake.iceberg.credential
72      - $(ICEBERG_REST_CREDENTIAL)
73      - --datalake.iceberg.scope
74      - <OAUTH2_SCOPE>

Apply:

BASH
1kubectl apply -f fluss-tiering-deployment.yaml

Verifying your deployment

BASH
1kubectl -n fluss get flinkdeployment fluss-tiering
2kubectl -n fluss logs -l app=fluss-tiering -c flink-main-container --tail=100

The job logs must confirm that the service successfully connected to Fluss, initialized the Iceberg REST catalog, and started consuming changes from each tiered Fluss table where you set table.datalake.enabled = true.

As the tiering service continues to process incoming streaming data, Iceberg snapshots accumulate under your configured warehouse path.

BASH
1aws s3 ls s3://<LAKE_BUCKET>/<WAREHOUSE_PREFIX>/<DATABASE>/<TABLE>/metadata/

Running Jobs on Ververica Platform 3 (VVP3)

The data lake workflows described in this guide run on Ververica Platform, which serves as a co-equal runner alongside native deployments. This section covers two specific implementation paths on Ververica Platform: deploying the tiering service as a JAR deployment, and executing lake reads through the Ververica Platform Kubernetes operator.

To review the foundational concepts that these workflows build upon, such as the differences between the web console and operator models, declaring dependencies as Artifacts, and registering the Fluss catalog, see Reading and Writing Fluss. From there, navigate to Running Lakehouse (Iceberg) Jobs against Flussr.

Tiering and JAR Deployment

Here is the step-by-step guide to configuring and running the Fluss Flink Tiering Service as a JAR Deployment on Ververica Platform 3 (VVP3).

Create the Deployment

  1. Navigate to Deployments in your VVP3 console.
  2. Click Create Deployment and select JAR Deployment.
  3. Configure the main artifact and its dependencies:
  • JAR URI: ```text /fluss-flink-tiering-0.9.1-vv-2.jar
  • Additional Dependencies: Add the URIs for the following required plugins and connectors (one per line or via the UI's multi-entry field): Fluss Flink Connector, Fluss Filesystem Plugin (fluss-fs-s3 or fluss-fs-azure, depending on your cloud provider) and Fluss Iceberg Lake Plugin

Configure Main Arguments

In the Main Arguments section, enter the --datalake.* and --fluss.* configuration properties.

Enter exactly one token per line (the argument flag on one line, and its value on the next line). Do not inline sensitive credentials; instead, reference them using VVP3 secret syntax (${secret_values.<name>}).

Save and Launch

  • Review your configurations to ensure all secret references match your stored VVP3 secret names exactly.
  • Click Save.
  • Click Start to initialize the deployment.
  • Monitor the deployment status until it transitions to RUNNING

Lake-bucket credentials for a tiering deployment, or any deployment that reads from a lake, are supplied differently on Ververica Platform 3 than on the operator or CLI runners. Ververica Platform 3 jobs do not use a core-site.xml file.

Reading Lake-Tiered Tables on VVP3

Reads that address a table's lake side directly must be submitted through the Ververica Platform 3 Kubernetes operator as a VvpDeployment custom resource. These include $lake$snapshots and $lake system tables, as well as union reads that combine the fresh Fluss side with $lake.

The console SQL editor, including the Debug feature, cannot run these queries. This is a known limitation.

Typical lake-read queries:

SQL
1-- Snapshot metadata the tiering service has committed for this table.
2SELECT `snapshot_id`, `operation`, `manifest_list`
3FROM `fluss`.`<DB>`.`<TABLE>$lake$snapshots`;
4-- Rows already tiered to the lake.
5SELECT COUNT(*) FROM `fluss`.`<DB>`.`<TABLE>$lake`;
6-- Union read: querying the table itself returns the union of fresh Fluss rows and tiered (lake) rows.
7SELECT COUNT(*) FROM `fluss`.`<DB>`.`<TABLE>`;

One-Time Operator Prerequisites

Before you submit VvpDeployment resources, you must configure two settings in the Ververica Platform 3 workspace.

Create a Deployment Target

  • Navigate to Ververica Platform 3 UI > Deployment Targets > Create Deployment Target.
  • Enter a name for the deployment target.
  • Bind the deployment target to the Kubernetes namespace that the operator deploys into.
  • Reference that target name as deploymentTargetName in your Custom Resource (CR).

Create an API Token and Secret

  • Navigate to Ververica Platform 3 UI > Configurations > API Tokens > Add Token.
  • Assign the Owner role to the token.
  • Copy the token value.
  • Store the token value as a Kubernetes Secret in your target namespace using the apikey data key.

BASH
1kubectl -n <VVP_DEPLOY_NAMESPACE> create secret generic <API_TOKEN_SECRET> \
2  --from-literal=apikey='<API_TOKEN>'

The annotations.apikeySecretName value in the Custom Resource (CR) must match the name of your Kubernetes Secret. The operator admission webhook uses this Secret to validate the token against the Ververica Platform 3 REST API.

You can submit any of these lake-side reads as a VvpDeployment that runs the query in batch mode. The CR must bind to a workspace deployment target and include the same additional dependencies as a console deployment.

The following example runs a union-count query into a filesystem sink. You might swap the sqlScript value for whichever lake read you need.

YAML
1apiVersion: ververica.platform/v1
2kind: VvpDeployment
3metadata:
4  name: fluss-union-counts
5  namespace: <VVP_DEPLOY_NAMESPACE>          # Kubernetes namespace the operator watches
6  annotations:
7    apikeySecretName: <API_TOKEN_SECRET>     # Secret holding the VVP API token (see operator docs)
8spec:
9  syncingMode: PATCH
10  deployment:
11    metadata:
12      name: fluss-union-counts
13      namespace: default                     # the VVP workspace
14    spec:
15      state: RUNNING
16      deploymentTargetName: <DEPLOYMENT_TARGET>   # workspace deployment target (see operator docs)
17      template:
18        spec:
19          artifact:
20            kind: SQLSCRIPT
21            sqlScript: |
22              CREATE TEMPORARY TABLE union_counts_out (
23                  `fresh_count`  BIGINT,
24                  `lake_count` BIGINT
25              ) WITH (
26                  'connector' = 'filesystem',
27                  'path'      = 's3://<LAKE_BUCKET>/<OUTPUT_PREFIX>/',
28                  'format'    = 'csv'
29              );
30              INSERT INTO union_counts_out
31              SELECT
32                  (SELECT COUNT(*) FROM `fluss`.`<DB>`.`<TABLE>`)        AS `fresh_count`,
33                  (SELECT COUNT(*) FROM `fluss`.`<DB>`.`<TABLE>$lake`)   AS `lake_count`;
34            additionalDependencies:
35              - <ARTIFACT_URI>   # Fluss Flink connector
36              - <ARTIFACT_URI>   # Fluss filesystem plugin (fluss-fs-s3 / fluss-fs-azure)
37              - <ARTIFACT_URI>   # Fluss Iceberg lake plugin
38              - <ARTIFACT_URI>   # Iceberg Flink runtime
39          engineVersionName: <VERA_ENGINE_VERSION>   # an available vera-*-flink-1.20 engine
40          batchMode: true
41          parallelism: 2
42          numberOfTaskManagers: 1
43          flinkConfiguration:
44            restart-strategy.type: none
45            taskmanager.numberOfTaskSlots: '2'

Upload Artifacts and Reference the Deployment Schema

Each <ARTIFACT_URI> placeholder represents the URI returned when you upload a JAR file to the workspace Artifacts store. This store is backed by the Ververica Platform 3 blob-storage bucket that you configure during Ververica Platform 3 installation, and any prefix within this bucket works. For more information, see Ververica Platform 3 prerequisites.

You must produce the JAR files using the fluss-setup installer, which you can find in Flink-Classpath Plugins, and then upload them to the Artifacts store.

To view the full VvpDeployment field schema, see the Ververica Platform 3 Kubernetes operator documentation.

Apply the resource and watch the batch job run to completion:

BASH
1kubectl apply -f lake-read.yaml
2kubectl -n <VVP_DEPLOY_NAMESPACE> get vvpdeployments -w

Operator-managed deployments also appear in the console under BATCH Deployments. From this location, you can track their status and open the Flink UI.

Track the status of the job in the console while it runs. From the console, you can view the Flink job graph and monitor per-operator metrics.

You can verify the results written to the sink, which include the committed snapshots and the union counts.

Troubleshooting

For client-level connection issues, such as Connection refused, ClassNotFoundException for the connector or filesystem plugins, or SASL authentication failures, see Reading and Writing Fluss › Troubleshooting.

The items below describe issues that are specific to lake-touching jobs.

This error almost always means that the core-site.xml file did not reach the JobManager or TaskManager classpath.

To resolve this issue, perform the following steps:

  1. Verify that flinkConfiguration.kubernetes.hadoop.conf.config-map.name is set.
  2. Confirm that the ConfigMap content uses the per-bucket pattern shown in Lake-Job Credentials › On NooBaa or another S3-compatible store (Hadoop catalog).

For Java SDK clients, the cause is different because the core-site.xml file is missing from the classpath. For more information, see Reading and Writing Fluss › Reads return zero rows on an S3-compatible cluster without STS.

Tiering job reports missing credentials when writing to the lake

The Hadoop core-site.xml ConfigMap is either missing or contains the wrong credentials. To resolve this issue, review Lake-Job Credentials to see the required content for your specific environment and to learn how to deploy the file.

If you run your jobs on Ververica Platform 3, you must check the IAM policy attached to the platform principal. For more information, see Lake-Job Credentials › On Ververica Platform 3 (VVP3).

The Iceberg REST catalog rejected the OAuth2 client credential when the Flink job, such as the tiering service or union-read, attempted to connect.

To resolve this issue, verify that the following conditions are met:

  • The --datalake.iceberg.uri flag for the tiering CLI or the catalog connect properties are reachable from inside the Flink TaskManager pod.
  • The credential uses the <CLIENT_ID>:<CLIENT_SECRET> format, which requires a single colon and no scheme.
  • The OAuth2 scope matches a scope that the catalog associates with the principal.
  • The warehouse name exists in the catalog.

Further Reading

Was this helpful?