Skip to main content

Apache Paimon

At its core, Apache Paimon is a dynamic data lake storage, streamlined for both streaming and batch data processing. With a knack for supporting high-throughput data writing and offering low-latency data querying, it is tailored for compatibility with Flink-based Ververica Cloud. If you’re aiming to set up your data lake storage on Hadoop Distributed File System (HDFS) or Ververica Cloud, Apache Paimon is your go-to solution. Dive deeper into Apache Paimon for a comprehensive understanding.

ItemDescription
Table typeSource table, dimension table, and result table
Running modeStreaming mode and batch mode
Data formatN/A
MetricN/A
Data update or deletion in a result tableSupported

Functionality Overview

Apache Paimon offers:

  • Data lake storage using HDFS or VERA
  • Read/write capabilities for large-scale datasets, both in streaming and batch modes
  • Fast batch and OLAP queries
  • Incremental data handling; suitable for both offline and streaming data warehouses
  • Data pre-aggregation, storage optimization, and computation
  • Tracing for historical data versions
  • Data filtering mechanisms
  • Table schema modifications

Restrictions

Apache Paimon connector is only supported by Ververica Cloud for Apache Flink with Ververica Runtime (VERA) 1.0.3 and newer.

Usage

Creating an Apache Paimon table within an Apache Paimon catalog does not necessitate the configuration of the connector parameter. The syntax for this process is demonstrated in the sample code that follows.

CREATE TABLE `<your-paimon-catalog>`.`<your-db>`.paimon_table (
id BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
...
);
note

If you have created an Apache Paimon table in the Apache Paimon catalog, you can directly use the table without the need to recreate a table.

In case you’re aiming to create a temporary Apache Paimon table within a catalog that belongs to a storage system other than Apache Paimon, it is mandatory to set the connector parameter and specify the storage path of the Apache Paimon table. Below is an example demonstrating the syntax needed to create an Apache Paimon table in such circumstances.

CREATE TEMPORARY TABLE paimon_table (
id BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'path' = '<path-to-paimon-table-files>',
'auto-create'='true' -- If no Apache Paimon table file exists in the specified path, a file is automatically created.
);

Parameters for the WITH Clause

ParameterDescriptionData typeDefault valueRemarks
connectorThe type of the table.STRINGNo default valueIf you create an Apache Paimon table in an Apache Paimon catalog, you do not need to configure this parameter. If you create a temporary Apache Paimon table in a catalog of a storage other than Apache Paimon, set the value to paimon.
pathThe storage path of the table.STRINGNo default valueIf you create an Apache Paimon table in an Apache Paimon catalog, you do not need to configure this parameter. If you create a temporary Apache Paimon table in a catalog of a storage other than Apache Paimon, set this parameter to the HDFS or S3 directory in which you want to store the table.
auto-createSpecifies whether to automatically create an Apache Paimon table file if no Apache Paimon table file exists in the specified path when you create a temporary Apache Paimon table.BOOLEANfalseValid values: false: If no Apache Paimon table file exists in the specified path, an error is returned. This is the default value. true: If the specified path does not exist, Flink automatically creates an Apache Paimon table file.
bucketThe number of buckets in each partition.INTEGER1Data that is written to the Apache Paimon table is distributed to each bucket, based on the columns that are specified by the bucket-key parameter. Note: Ververica recommends that the data in each bucket be less than 5 GB in size.
bucket-keyThe bucket key columns.STRINGNo default valueThe columns based on which the data written to the Apache Paimon table is distributed to different buckets. Separate column names with commas (,). For example, 'bucket-key' = 'order_id,cust_id' indicates that data is distributed to buckets based on the order_id and cust_id columns. Note: If you do not configure this parameter, data is distributed based on the primary key. If no primary key is specified for the Apache Paimon table, data is distributed based on the values of all columns.
changelog-producerThe incremental data generation mechanism.STRINGnoneApache Paimon can generate complete incremental data for any input data stream to facilitate downstream data consumption. Each UPDATE_AFTER data record corresponds to an UPDATE_BEFORE data record. Valid values: none, input, full-compaction, lookup. input: The input data streams are written to an incremental data file as incremental data in dual-write mode. full-compaction: Complete incremental data is generated each time full-compaction is performed. lookup: Complete incremental data is generated before commit snapshot is performed.
full-compaction.delta-commitsThe maximum interval at which full-compaction is performed.INTEGERNo default valueA full-compaction is definitely triggered when the number of commit snapshots reaches the value of this parameter.
lookup.cache-max-memory-sizeThe memory cache size of the Apache Paimon dimension table.STRING256 MBThe value of this parameter determines the cache sizes of both the dimension table and the lookup changelog producer.
merge-engineThe mechanism for merging data that has the same primary key.STRINGdeduplicateValid values: deduplicate, partial-update, aggregation. deduplicate: Only the latest data record is retained. partial-update: Existing data that has the same primary key as the latest data is overwritten by the latest data in the non-null columns. Data in other columns remains unchanged. aggregation: An aggregate function is specified to perform pre-aggregation.
partial-update.ignore-deleteSpecifies whether to ignore delete messages when the merge-engine parameter is set to partial-update.BOOLEANfalseValid values: true (delete messages are ignored), false (an error is reported if a delete message appears). Note: Whether delete messages need to be ignored depends on the actual scenario. You need to configure this parameter based on your business requirements.
partition.default-nameThe default name of the partition.STRING__DEFAULT_PARTITION__If the value of a partition key column is null or an empty string, the value of this parameter is used as the partition name.
partition.expiration-check-intervalThe interval at which the system checks partition expiration.STRING1h
partition.expiration-timeThe validity period of a partition.STRINGNo default valueIf the period of time for which a partition exists exceeds the value of this parameter, the partition expires. By default, a partition never expires. The period of time for which a partition exists is calculated based on the value of the partition.
partition.timestamp-formatterThe pattern that is used to convert a time string into a timestamp.STRINGNo default valueThis parameter specifies the pattern that is used to extract the period of time for which a partition exists from the partition value.
partition.timestamp-patternThe pattern that is used to convert a partition value into a time string.STRINGNo default valueThis parameter specifies the pattern that is used to extract the period of time for which a partition exists from the partition value.
scan.bounded.watermarkThe end condition for bounded streaming mode.LONGNo default valueN/A.
scan.modeThe consumer offset of the Apache Paimon source table.STRINGdefault
scan.snapshot-idThe ID of the snapshot from which the Apache Paimon source table starts to consume data.INTEGERNo default value
scan.timestamp-millisThe point in time from which the Apache Paimon source table starts to consume data.INTEGERNo default value
snapshot.num-retained.maxThe maximum number of the latest snapshots that can be retained.INTEGER2147483647The snapshot expiration is triggered only if the condition specified by the snapshot.num-retained.max parameter or the snapshot.time-retained parameter is met.
snapshot.num-retained.minThe minimum number of the latest snapshots that can be retained.INTEGER10N/A.
snapshot.time-retainedThe duration for which snapshots can be retained.STRING1hThe snapshot expiration is triggered only if the condition specified by the snapshot.num-retained.max parameter or the snapshot.time-retained parameter is met.
write-modeThe write mode of the Apache Paimon table.STRINGchange-logValid values: change-log (based on the primary key) and append-only (only data insertion). change-log: Data is inserted into, deleted from, and updated in the Apache Paimon table based on the primary key. append-only: The Apache Paimon table allows only data insertion and does not support operations based on the primary key. This mode is more efficient than the change-log mode.
scan.infer-parallelismSpecifies whether to automatically infer the degree of parallelism of the Apache Paimon source table.BOOLEANfalseValid values true: The parallelism of the Apache Paimon source table is automatically inferred based on the number of buckets. false: The default degree of parallelism that is configured based on Ververica Platform (VVP) is used. If the resource configuration is in expert mode, the degree of parallelism that is configured is used.
scan.parallelismThe degree of parallelism of the Apache Paimon source table.INTEGERNo default valueN/A.
sink.parallelismThe degree of parallelism of the Apache Paimon result table.INTEGERNo default valueN/A.
note

For more information about the configuration items, see Apache Paimon Documentation.

Feature Overview

Assuring Data Freshness and Consistency

The Apache Paimon result table commits data using the two-phase commit protocol (2PC) every time a Flink deployment triggers a checkpoint. This ensures that the data’s freshness corresponds to the Flink deployment’s checkpoint interval. During each data commit, up to two snapshots may be created.

In scenarios where two separate Flink deployments concurrently write to an Apache Paimon table, but target different buckets, serializable consistency is maintained. However, if they target the same bucket, only snapshot-level consistency is maintained. This might lead to the table reflecting the results from both deployments, but the assurance is that no data will go missing.

Data Merging Mechanism

If an Apache Paimon result table detects multiple records with identical primary keys, it combines these records into a singular entry to maintain primary key uniqueness. This merging strategy can be dictated by the merge-engine parameter. A subsequent section delves deeper into the various available data merging strategies.

MechanismDescription
deduplicateThis is the default value of the merge-engine parameter. If the data merging mechanism is deduplicate and multiple data records have the same primary key, the Apache Paimon result table retains only the latest data record and discards other data records.

Note: If the latest data record is a delete message, all the data records that have the same primary key are discarded.
partial-updateThe partial-update mechanism allows you to use multiple messages to update data and finally obtain complete data. New data that has the same primary key as the existing data overwrites the existing data. Columns that have NULL values cannot overwrite existing data.

For example, the Apache Paimon result table receives the following data records in sequence:

<1, 23.0, 10, NULL>

<1, NULL, NULL, ‘This is a book’>

<1, 25.2, NULL, NULL>

If the first column is the primary key, the final result is <1, 25.2, 10, ‘This is a book’>.

Note: If you want the Apache Paimon result table to read the result that is obtained by using the partial-update mechanism in streaming mode, you must set the changelog-producer parameter to lookup or full-compaction. When you use the partial-update mechanism, delete messages cannot be processed. You can set the partial-update.ignore-delete parameter to true to ignore delete messages.
aggregationIn specific scenarios, you may focus only on the aggregated values. The aggregation mechanism can aggregate data that has the same primary key based on the specified aggregate function. You must use fields.<field-name>.aggregate-function to specify an aggregate function for each column that is not the primary key. If you do not specify an aggregate function for a column that is not the primary key, the column uses the last_non_null_value aggregate function by default. The following sample code provides an example:

CREATE TABLE MyTable (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.price.aggregate-function' = 'max',
'fields.sales.aggregate-function' = 'sum'
);

In this example, data in the price column is aggregated based on the max function, and data in the sales column is aggregated based on the sum function. If the input data records are <1, 23.0, 15> and <1, 30.2, 20>, the result is <1, 30.2, 35>. Mappings between the supported aggregate functions and data types:

  • sum: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE
  • min and max: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ
  • last_value and last_non_null_value: all data types
  • listagg: STRING
  • bool_and and bool_or: BOOLEAN
note

Only the sum function supports data retraction and deletion. If you want specific columns to ignore retraction and deletion messages, you can configure fields.${field_name}.ignore-retract'='true'. If you want the Apache Paimon result table to read the aggregation result in streaming mode, you must set the changelog-producer parameter to lookup or full-compaction.

Mechanism for Producing Incremental Data

The method for producing incremental data is determined by the changelog-producer parameter. Apache Paimon has the capability to produce comprehensive incremental data from any given input stream. Every UPDATE_AFTER data entry has a matching UPDATE_BEFORE record. A detailed list of all the mechanisms for generating incremental data can be found below. For an in-depth understanding, refer to the official Apache Paimon documentation.

MechanismDescription
noneThis is the default value of the changelog-producer parameter. If you use the default value, the Apache Paimon source table of the downstream consumer can obtain only the latest situation of data when specific data records have the same primary key. In this case, the downstream consumer cannot learn the complete incremental data to effectively calculate data. The downstream consumer can view the latest data and determine whether existing data is deleted, but cannot learn more information about the deleted data.

For example, if the downstream consumer wants to calculate the sum of a column and the consumer obtains only the latest value 5, the downstream consumer cannot determine how to update the sum. If the original value is 4, the sum should be increased by 1. If the original value is 6, the sum should be decreased by 1. This type of consumer is sensitive to UPDATE_BEFORE data. For this type of consumer, Ververica recommends that you do not set the changelog-producer parameter to none. However, other incremental data generation mechanisms may cause performance loss.

Note: If your downstream consumer is a database that is not sensitive to UPDATE_BEFORE data, you can set the changelog-producer parameter to none. Ververica recommends that you configure this parameter based on your business requirements.
inputIf you set the changelog-producer parameter to input, the Apache Paimon result table writes input data streams to an incremental data file as incremental data in dual-write mode.

Therefore, this incremental data generation mechanism can be used only when the input data streams, such as Change Data Capture (CDC) data, are complete.

lookupIf you set the changelog-producer parameter to lookup, the Apache Paimon result table uses a point query mechanism that is similar to a dimension table to generate complete incremental data that corresponds to the snapshot before commit snapshot is performed. The incremental data generation mechanism allows the Apache Paimon result table to generate complete incremental data regardless of whether the input incremental data is complete.

The lookup mechanism is more efficient than the full-compaction mechanism in the generation of incremental data. However, the lookup mechanism consumes more resources.

Ververica recommends that you use the lookup mechanism in scenarios where the requirement for the freshness of incremental data is high. For example, incremental data within minutes is required.
full-compactionIf you set the changelog-producer parameter to full-compaction, the Apache Paimon result table generates complete incremental data each time full-compaction is performed. The incremental data generation mechanism allows the Apache Paimon result table to generate complete incremental data regardless of whether the input incremental data is complete. The time interval at which full-compaction is performed is specified by the full-compaction.delta-commits parameter.

Compared with the lookup mechanism, the full-compaction mechanism is less efficient in generating incremental data. However, the full-compaction mechanism does not cause additional computations based on the full-compaction process of data. Therefore, fewer resources are consumed.

Ververica recommends that you use the full-compaction mechanism in scenarios where the requirement for the freshness of incremental data is not high. For example, incremental data within hours is required.

Write Mode

The table below outlines the write modes that Apache Paimon tables can accommodate.

ModeDescription
Change-logChange-log is the default write mode for Apache Paimon tables. In change-log mode, data can be inserted into, deleted from, and updated in an Apache Paimon table based on the primary key of the table. In change-log mode, you can also use the data merging mechanism and incremental data generation mechanism.
Append-onlyIn append-only mode, the Apache Paimon table allows only data insertion and does not support operations based on the primary key. The append-only mode is more efficient than the change-log mode. In append-only mode, an Apache Paimon table can be used as a substitute of Message Queue in scenarios where the data freshness requirement is not high. For example, data within hours is required.

For more information about the append-only mode, see Apache Paimon official documentation. When you use the append-only mode, take note of the following points:

Ververica recommends that you configure the bucket-key parameter based on your business requirements. Otherwise, data of the Apache Paimon table is distributed to buckets based on the values of all columns. This results in low computing efficiency.

The append-only mode ensures the data generation order based on the following rules:

If two data records come from different partitions and the scan.plan-sort-partition parameter is configured, data that has the smaller partition value is preferentially generated. Otherwise, data from the partition that is created earlier is preferentially generated.

If two data records come from the same bucket in the same partition, the data that is written earlier is preferentially generated.

If two data records come from different buckets in the same partition, the data generation order cannot be ensured because data in different buckets is processed in different parallel subtasks.
note

Some information on this page is derived from the official Apache Paimon documentation.

Refer to the Credits page for more information.