Skip to main content

Delta Lake

The Delta Lake connector allows you to read and write Delta Lake tables in Flink. Delta Lake is an open-source storage layer that brings reliability to data lakes with ACID transactions, schema enforcement, and time travel.

Supported Version: VERA Engine 4.3

Background Information

A Delta table is a directory containing Parquet data files and a transaction log (the _delta_log folder) that tracks all operations. Engines like VERA Engine read this log to maintain a consistent view of the table.

ItemDescription
Table typeSource table and result table
Running modeStreaming mode only
Data formatParquet
Data update/deletionSupported (Append-only writes currently supported for result tables)

Features

The Delta Lake connector provides the following capabilities:

  • Source and Sink Support: Use Delta tables as both input sources and output sinks in your SQL jobs.
  • Streaming Mode: Submit queries in streaming mode for continuous data processing.
  • Parquet Format: Uses the embedded Parquet format for efficient storage and retrieval.
  • Schema Enforcement: Automatically reflects schema evolution from Delta Lake.

Prerequisites

  • A Delta Catalog must be configured. Using the Delta Lake connector without a pre-created Delta Catalog will fail.
  • Storage for Delta tables (such as an S3 bucket) that is accessible from Ververica Cloud.

Syntax

CREATE TABLE delta_table (
c1 VARCHAR,
c2 INT
) WITH (
'connector' = 'delta',
'table-path' = 's3a://your-bucket/path/to/table'
);

Parameters in the WITH Clause

ParameterRequiredDefaultTypeDescription
connectorYes(none)StringMust be set to delta.
table-pathYes(none)StringThe filesystem path or URI to the Delta table location where the _delta_log exists.
delta.*No(none)StringPass-through for Delta table properties (for example, delta.appendOnly, delta.logRetentionDuration).
<custom-property>No(none)StringArbitrary user-defined table properties for your environment.

SQL Usage Examples

Create a Delta Table and Insert Data

-- Create a Delta table backed by an S3 location
CREATE TABLE c_delta.db_new.t_foo (
c1 VARCHAR,
c2 INT
) WITH (
'connector' = 'delta',
'table-path' = 's3a://warehouse/'
);

-- Write data
INSERT INTO c_delta.db_new.t_foo
VALUES ('a', 42);

Create a Partitioned Delta Table

CREATE TABLE testTable (
id BIGINT,
data STRING,
part_a STRING,
part_b STRING
)
PARTITIONED BY (part_a, part_b)
WITH (
'connector' = 'delta',
'table-path' = 's3a://your-bucket/testTable',
'delta.appendOnly' = 'true'
);

Move Data Between Delta Tables

Both the source and sink tables must use the Delta connector, and their schemas must match.

INSERT INTO sinkTable
SELECT *
FROM sourceTable;

Limits

  • Streaming Mode Only: Queries must be submitted in streaming mode. Batch queries may fail to commit output correctly.
  • Append-only Writes: Only append writes are currently supported for result tables.
  • Physical Columns Only: The connector currently supports only physical columns. Metadata and computed columns are not supported.
  • Mode Property: For all source table queries, use the /*+ OPTIONS('mode' = 'streaming') */ property modifier to ensure proper committing to the delta log.