Delta Lake
The Delta Lake connector allows you to read and write Delta Lake tables in Flink. Delta Lake is an open-source storage layer that brings reliability to data lakes with ACID transactions, schema enforcement, and time travel.
Supported Version: VERA Engine 4.3
Background Information
A Delta table is a directory containing Parquet data files and a transaction log (the _delta_log folder) that tracks all operations. Engines like VERA Engine read this log to maintain a consistent view of the table.
| Item | Description |
|---|---|
| Table type | Source table and result table |
| Running mode | Streaming mode only |
| Data format | Parquet |
| Data update/deletion | Supported (Append-only writes currently supported for result tables) |
Features
The Delta Lake connector provides the following capabilities:
- Source and Sink Support: Use Delta tables as both input sources and output sinks in your SQL jobs.
- Streaming Mode: Submit queries in streaming mode for continuous data processing.
- Parquet Format: Uses the embedded Parquet format for efficient storage and retrieval.
- Schema Enforcement: Automatically reflects schema evolution from Delta Lake.
Prerequisites
- A Delta Catalog must be configured. Using the Delta Lake connector without a pre-created Delta Catalog will fail.
- Storage for Delta tables (such as an S3 bucket) that is accessible from Ververica Cloud.
Syntax
CREATE TABLE delta_table (
c1 VARCHAR,
c2 INT
) WITH (
'connector' = 'delta',
'table-path' = 's3a://your-bucket/path/to/table'
);
Parameters in the WITH Clause
| Parameter | Required | Default | Type | Description |
|---|---|---|---|---|
connector | Yes | (none) | String | Must be set to delta. |
table-path | Yes | (none) | String | The filesystem path or URI to the Delta table location where the _delta_log exists. |
delta.* | No | (none) | String | Pass-through for Delta table properties (for example, delta.appendOnly, delta.logRetentionDuration). |
<custom-property> | No | (none) | String | Arbitrary user-defined table properties for your environment. |
SQL Usage Examples
Create a Delta Table and Insert Data
-- Create a Delta table backed by an S3 location
CREATE TABLE c_delta.db_new.t_foo (
c1 VARCHAR,
c2 INT
) WITH (
'connector' = 'delta',
'table-path' = 's3a://warehouse/'
);
-- Write data
INSERT INTO c_delta.db_new.t_foo
VALUES ('a', 42);
Create a Partitioned Delta Table
CREATE TABLE testTable (
id BIGINT,
data STRING,
part_a STRING,
part_b STRING
)
PARTITIONED BY (part_a, part_b)
WITH (
'connector' = 'delta',
'table-path' = 's3a://your-bucket/testTable',
'delta.appendOnly' = 'true'
);
Move Data Between Delta Tables
Both the source and sink tables must use the Delta connector, and their schemas must match.
INSERT INTO sinkTable
SELECT *
FROM sourceTable;
Limits
- Streaming Mode Only: Queries must be submitted in streaming mode. Batch queries may fail to commit output correctly.
- Append-only Writes: Only append writes are currently supported for result tables.
- Physical Columns Only: The connector currently supports only physical columns. Metadata and computed columns are not supported.
- Mode Property: For all source table queries, use the
/*+ OPTIONS('mode' = 'streaming') */property modifier to ensure proper committing to the delta log.