Skip to main content

Apache Iceberg

This topic describes how to use the Apache Iceberg connector.

Background Information

Apache Iceberg is an open table format for data lakes. You can use Apache Iceberg to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Amazon Simple Storage Service (S3). Then, you can use a computing engine of the open source big data ecosystem, such as Apache Flink, Apache Spark, Apache Hive, or Apache Presto, to analyze data in your data lake.

ItemDescription
Table typeSource table and result table
Running modeStreaming mode
Data formatN/A
MetricN/A
Data update or deletion in a result tableSupported

Features

Apache Iceberg provides the following core capabilities:

  • Builds a low-cost lightweight data lake storage service based on HDFS or S3.
  • Provides comprehensive atomicity, consistency, isolation, durability (ACID) semantics.
  • Supports historical version backtracking.
  • Supports efficient data filtering.
  • Supports schema evolution.
  • Supports partition evolution.
note

You can use the efficient fault tolerance and stream processing capabilities of Flink to import a large amount of behavioral data in logs into an Apache Iceberg data lake in real time. Then, you can use Flink or another analytics engine to extract the value of your data.

Prerequisites

Please refer to the Apache Iceberg private connection setup.

Limits

Only Ververica Cloud that uses VERA 1.0.3 and later supports the Apache Iceberg connector. The Apache Iceberg connector supports only the Apache Iceberg table format of version 1. For more information, see Iceberg Table Spec.

Syntax

 CREATE TABLE iceberg_table (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
...
);

Parameters in the WITH Clause

Common Parameters

ParameterDescriptionData typeRequiredDefault valueRemarks
connectorThe type of the source table.STRINGYesNo default valueSet the value to iceberg.
catalog-nameThe name of the catalog.STRINGYesNo default valueSet the value to a custom name.
catalog-databaseThe name of the database.STRINGYesdefaultSet the value to the name of the database that is created with Glue catalog.
io-implThe name of the implementation class in the distributed file system.STRINGYesNo default valueSet the value to org.apache.iceberg.aws.s3.S3FileIO.
catalog-implThe class name of the catalog.STRINGYesNo default value
warehouseThe S3 directory in which table data is stored.STRINGYesNo default valueN/A.

Parameters Only for Result Tables

ParameterDescriptionData typeRequiredDefault valueRemarks
write.operationThe write operation mode.STRINGNoupsert
  • upsert: Data is updated. This is the default value.
  • insert: Data is written to the table in append mode.
  • bulk_insert: A specific amount of data is written at a time and existing data is not updated.
hive_sync.enableSpecifies whether to enable the synchronization of metadata to Hive.BOOLEANNofalseValid values:
  • true: The synchronization of metadata to Hive is enabled.
  • false: The synchronization of metadata to Hive is disabled. This is the default value.
hive_sync.modeThe Hive data synchronization mode.STRINGNohms
  • hms: If you use a Hive metastore catalog, retain the default value. This is the default value.
  • jdbc: If the Java Database Connectivity (JDBC) catalog is used, set this value to jdbc.
hive_sync.dbThe name of the Hive database to which data is synchronized.STRINGNoDatabase name of the current table in the catalogN/A.
hive_sync.tableThe name of the Hive table to which data is synchronized.STRINGNoName of the current tableN/A.

Data Type Mappings

Data type of Apache IcebergData type of Flink
BOOLEANBOOLEAN
INTINT
LONGBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMAL(P,S)DECIMAL(P,S)
DATEDATE
TIMETIME Note: Apache Iceberg timestamps are accurate to the microsecond, and Flink timestamps are accurate to the millisecond. When you use Flink to read Apache Iceberg data, the time precision is aligned to milliseconds.
TIMESTAMPTIMESTAMP
TIMESTAMPTZTIMESTAMP_LTZ
STRINGSTRING
FIXED(L)BYTES
BINARYVARBINARY
STRUCT<...>ROW
LIST<E>LIST
MAP<K,V>MAP

Sample Code

  1. Create an S3 bucket.
  2. Externally create an Iceberg catalog using the above S3 location.
  3. Follow the documentation for Private connections.
  4. Add a Glue policy to your IAM role (if using Glue as backend catalog).
  5. Run the scripts.

Sample Code for an Apache Iceberg Source Read Script

CREATE TEMPORARY TABLE iceberg_catalog_table (
id INT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='iceberg_db',
'catalog-database'='iceberg_db',
'catalog-table'='sample',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'warehouse'='s3://iceberg-testing-us-west-1/catalog/'
);

CREATE TEMPORARY TABLE sink (
id INT,
data STRING
) WITH (
'connector' = 'blackhole'
);

INSERT INTO sink SELECT id, data
FROM iceberg_catalog_table;

Sample Code for an Apache Iceberg Sink Write Script

CREATE TEMPORARY TABLE datagen (
name VARCHAR,
age INT
) WITH (
'connector' = 'datagen'
);

CREATE TEMPORARY TABLE iceberg_catalog_table (
id INT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='iceberg_db',
'catalog-database'='iceberg_db',
'catalog-table'='sample',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'warehouse'='s3://iceberg-testing-us-west-1/catalog/',
's3.staging-dir'='s3://iceberg-testing-us-west-1/staging/'
);

INSERT INTO iceberg_catalog_table SELECT age, name
FROM datagen;