Skip to main content

Delta Lake

The Delta Lake connector allows you to read and write Delta Lake tables in Flink. Delta Lake is an open-source storage layer that brings reliability to data lakes with ACID transactions, schema enforcement, and time travel.

Supported Versions

Tested storage backends: local filesystem and AWS S3.

Background Information

A Delta table is a directory containing Parquet data files and a transaction log (the _delta_log folder) that tracks all operations. Engines like VERA Engine read this log to maintain a consistent view of the table.

ItemDescription
Table typeSource table, dimension table, and result table
Running modeStreaming Drafts and Batch Drafts
Data formatParquet
Data update/deletionAppend-only writes

Features

The Delta Lake connector provides the following capabilities:

  • Source and Sink Support: Use Delta tables as both input sources and output sinks in your SQL jobs.
  • Streaming Mode: Submit queries in streaming mode for continuous data processing.
  • Parquet Format: Uses the embedded Parquet format for efficient storage and retrieval.
  • Schema Enforcement: Automatically reflects schema evolution from Delta Lake.

Prerequisites

  • A Delta Catalog must be configured. Using the Delta Lake connector without a pre-created Delta Catalog will fail.
  • Storage for Delta tables (such as an S3 bucket) that is accessible from Ververica Cloud.

Syntax

CREATE TABLE delta_table (
c1 VARCHAR,
c2 INT
) WITH (
'connector' = 'delta',
'table-path' = 's3a://your-bucket/path/to/table'
);

Parameters in the WITH Clause

ParameterRequiredDefaultTypeDescription
connectorYes(none)StringMust be set to delta.
table-pathYes(none)StringThe filesystem path or URI to the Delta table location where the _delta_log exists.
delta.*No(none)StringPass-through for Delta table properties (for example, delta.appendOnly, delta.logRetentionDuration).
<custom-property>No(none)StringArbitrary user-defined table properties for your environment.

SQL Usage Examples

Create a Delta Table and Insert Data

-- Create a Delta table backed by an S3 location
CREATE TABLE c_delta.db_new.t_foo (
c1 VARCHAR,
c2 INT
) WITH (
'connector' = 'delta',
'table-path' = 's3a://warehouse/'
);

-- Write data
INSERT INTO c_delta.db_new.t_foo
VALUES ('a', 42);

Create a Partitioned Delta Table

CREATE TABLE testTable (
id BIGINT,
data STRING,
part_a STRING,
part_b STRING
)
PARTITIONED BY (part_a, part_b)
WITH (
'connector' = 'delta',
'table-path' = 's3a://your-bucket/testTable',
'delta.appendOnly' = 'true'
);

Move Data Between Delta Tables

Both the source and sink tables must use the Delta connector, and their schemas must match.

INSERT INTO sinkTable
SELECT *
FROM sourceTable;

Limits

  • Append-only Writes: Only append writes are currently supported for result tables.
  • Physical Columns Only: The connector currently supports only physical columns. Metadata and computed columns are not supported.
  • Streaming Drafts Mode: In Streaming Drafts mode, add the /*+ OPTIONS('mode' = 'streaming') */ hint to source table queries to ensure proper committing to the Delta log.
  • Batch Drafts Mode: In Batch Drafts mode, all commits are delivered automatically. Use Batch Drafts mode if small input sizes prevent commits from completing in Streaming Drafts mode.
  • Read Compatibility: Flink can only read Delta logs with STREAMING UPDATE operations. Data written to the same table by other engines (such as Databricks) must use the STREAMING UPDATE label for Flink to read it correctly.
  • Known Committer Issue: In some scenarios, the GlobalDeltaCommitter and DeltaCommitter may fail to write to the Delta log. For details and current status, see delta-io/delta#3057.