Apache Iceberg
This topic describes how to use the Apache Iceberg connector.
Background information
Apache Iceberg is an open table format for data lakes. You can use Apache Iceberg to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Amazon Simple Storage Service (S3). Then, you can use a computing engine of the open source big data ecosystem, such as Apache Flink, Apache Spark, Apache Hive, or Apache Presto, to analyze data in your data lake.
Item | Description |
---|---|
Table type | Source table and result table |
Running mode | Streaming mode |
Data format | N/A |
Metric | N/A |
API type | SQL API |
Data update or deletion in a result table | Supported |
Features
Apache Iceberg provides the following core capabilities:
- Builds a low-cost lightweight data lake storage service based on HDFS or S3.
- Provides comprehensive atomicity, consistency, isolation, durability (ACID) semantics.
- Supports historical version backtracking.
- Supports efficient data filtering.
- Supports schema evolution.
- Supports partition evolution.
You can use the efficient fault tolerance and stream processing capabilities of Flink to import a large amount of behavioral data in logs into an Apache Iceberg data lake in real time. Then, you can use Flink or another analytics engine to extract the value of your data.
Prerequisites
Please refer to the Apache Iceberg private connection setup.
Limits
Only Ververica Cloud that uses VERA 1.0.3 and later supports the Apache Iceberg connector. The Apache Iceberg connector supports only the Apache Iceberg table format of version 1. For more information, see Iceberg Table Spec.
Syntax
CREATE TABLE iceberg_table (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
...
);
Parameters in the WITH clause
Common parameters
Parameter | Description | Data type | Required | Default value | Remarks |
---|---|---|---|---|---|
connector | The type of the source table. | STRING | Yes | No default value | Set the value to iceberg . |
catalog-name | The name of the catalog. | STRING | Yes | No default value | Set the value to a custom name. |
catalog-database | The name of the database. | STRING | Yes | default | Set the value to the name of the database that is created with Glue catalog. |
io-impl | The name of the implementation class in the distributed file system. | STRING | Yes | No default value | Set the value to org.apache.iceberg.aws.s3.S3FileIO . |
catalog-impl | The class name of the catalog. | STRING | Yes | No default value | |
warehouse | The S3 directory in which table data is stored. | STRING | Yes | No default value | N/A. |
Parameters only for result tables
Parameter | Description | Data type | Required | Default value | Remarks |
---|---|---|---|---|---|
write.operation | The write operation mode. | STRING | No | upsert |
|
hive_sync.enable | Specifies whether to enable the synchronization of metadata to Hive. | BOOLEAN | No | false | Valid values:
|
hive_sync.mode | The Hive data synchronization mode. | STRING | No | hms |
|
hive_sync.db | The name of the Hive database to which data is synchronized. | STRING | No | Database name of the current table in the catalog | N/A. |
hive_sync.table | The name of the Hive table to which data is synchronized. | STRING | No | Name of the current table | N/A. |
Data type mappings
Data type of Apache Iceberg | Data type of Flink |
---|---|
BOOLEAN | BOOLEAN |
INT | INT |
LONG | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(P,S) | DECIMAL(P,S) |
DATE | DATE |
TIME | TIME Note: Apache Iceberg timestamps are accurate to the microsecond, and Flink timestamps are accurate to the millisecond. When you use Flink to read Apache Iceberg data, the time precision is aligned to milliseconds. |
TIMESTAMP | TIMESTAMP |
TIMESTAMPTZ | TIMESTAMP_LTZ |
STRING | STRING |
FIXED(L) | BYTES |
BINARY | VARBINARY |
STRUCT<...> | ROW |
LIST<E> | LIST |
MAP<K,V> | MAP |
Sample code
- Create an S3 bucket.
- Externally create an Iceberg catalog using the above S3 location.
- Follow the documentation for Private connections.
- Add a Glue policy to your IAM role (if using Glue as backend catalog).
- Run the scripts.
Sample code for an Apache Iceberg source read script
CREATE TEMPORARY TABLE iceberg_catalog_table (
id INT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='iceberg_db',
'catalog-database'='iceberg_db',
'catalog-table'='sample',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'warehouse'='s3://iceberg-testing-us-west-1/catalog/'
);
CREATE TEMPORARY TABLE sink (
id INT,
data STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO sink SELECT id, data
FROM iceberg_catalog_table;
Sample code for an Apache Iceberg sink write script
CREATE TEMPORARY TABLE datagen (
name VARCHAR,
age INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE iceberg_catalog_table (
id INT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='iceberg_db',
'catalog-database'='iceberg_db',
'catalog-table'='sample',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'warehouse'='s3://iceberg-testing-us-west-1/catalog/',
's3.staging-dir'='s3://iceberg-testing-us-west-1/staging/'
);
INSERT INTO iceberg_catalog_table SELECT age, name
FROM datagen;