Apache Paimon
At its core, Apache Paimon is a dynamic data lake storage, streamlined for both streaming and batch data processing. With a knack for supporting high-throughput data writing and offering low-latency data querying, it is tailored for compatibility with Flink-based Ververica Cloud. If you’re aiming to set up your data lake storage on Hadoop Distributed File System (HDFS) or Ververica Cloud, Apache Paimon is your go-to solution. Dive deeper into Apache Paimon for a comprehensive understanding.
Item | Description |
---|---|
Table type | Source table, dimension table, and result table |
Running mode | Streaming mode and batch mode |
Data format | N/A |
Metric | N/A |
Data update or deletion in a result table | Supported |
Functionality Overview
Apache Paimon offers:
- Data lake storage using HDFS or VERA
- Read/write capabilities for large-scale datasets, both in streaming and batch modes
- Fast batch and OLAP queries
- Incremental data handling; suitable for both offline and streaming data warehouses
- Data pre-aggregation, storage optimization, and computation
- Tracing for historical data versions
- Data filtering mechanisms
- Table schema modifications
Restrictions
Apache Paimon connector is only supported by Ververica Cloud for Apache Flink with Ververica Runtime (VERA) 1.0.3 and newer.
Usage
Creating an Apache Paimon table within an Apache Paimon catalog does not necessitate the configuration of the connector parameter. The syntax for this process is demonstrated in the sample code that follows.
CREATE TABLE `<your-paimon-catalog>`.`<your-db>`.paimon_table (
id BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
...
);
If you have created an Apache Paimon table in the Apache Paimon catalog, you can directly use the table without the need to recreate a table.
In case you’re aiming to create a temporary Apache Paimon table within a catalog that belongs to a storage system other than Apache Paimon, it is mandatory to set the connector parameter and specify the storage path of the Apache Paimon table. Below is an example demonstrating the syntax needed to create an Apache Paimon table in such circumstances.
CREATE TEMPORARY TABLE paimon_table (
id BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'path' = '<path-to-paimon-table-files>',
'auto-create'='true' -- If no Apache Paimon table file exists in the specified path, a file is automatically created.
);
Parameters for the WITH Clause
Parameter | Description | Data type | Default value | Remarks |
---|---|---|---|---|
connector | The type of the table. | STRING | No default value | If you create an Apache Paimon table in an Apache Paimon catalog, you do not need to configure this parameter. If you create a temporary Apache Paimon table in a catalog of a storage other than Apache Paimon, set the value to paimon. |
path | The storage path of the table. | STRING | No default value | If you create an Apache Paimon table in an Apache Paimon catalog, you do not need to configure this parameter. If you create a temporary Apache Paimon table in a catalog of a storage other than Apache Paimon, set this parameter to the HDFS or S3 directory in which you want to store the table. |
auto-create | Specifies whether to automatically create an Apache Paimon table file if no Apache Paimon table file exists in the specified path when you create a temporary Apache Paimon table. | BOOLEAN | false | Valid values: false: If no Apache Paimon table file exists in the specified path, an error is returned. This is the default value. true: If the specified path does not exist, Flink automatically creates an Apache Paimon table file. |
bucket | The number of buckets in each partition. | INTEGER | 1 | Data that is written to the Apache Paimon table is distributed to each bucket, based on the columns that are specified by the bucket-key parameter. Note: Ververica recommends that the data in each bucket be less than 5 GB in size. |
bucket-key | The bucket key columns. | STRING | No default value | The columns based on which the data written to the Apache Paimon table is distributed to different buckets. Separate column names with commas (,). For example, 'bucket-key' = 'order_id,cust_id' indicates that data is distributed to buckets based on the order_id and cust_id columns. Note: If you do not configure this parameter, data is distributed based on the primary key. If no primary key is specified for the Apache Paimon table, data is distributed based on the values of all columns. |
changelog-producer | The incremental data generation mechanism. | STRING | none | Apache Paimon can generate complete incremental data for any input data stream to facilitate downstream data consumption. Each UPDATE_AFTER data record corresponds to an UPDATE_BEFORE data record. Valid values: none, input, full-compaction, lookup. input: The input data streams are written to an incremental data file as incremental data in dual-write mode. full-compaction: Complete incremental data is generated each time full-compaction is performed. lookup: Complete incremental data is generated before commit snapshot is performed. |
full-compaction.delta-commits | The maximum interval at which full-compaction is performed. | INTEGER | No default value | A full-compaction is definitely triggered when the number of commit snapshots reaches the value of this parameter. |
lookup.cache-max-memory-size | The memory cache size of the Apache Paimon dimension table. | STRING | 256 MB | The value of this parameter determines the cache sizes of both the dimension table and the lookup changelog producer. |
merge-engine | The mechanism for merging data that has the same primary key. | STRING | deduplicate | Valid values: deduplicate, partial-update, aggregation. deduplicate: Only the latest data record is retained. partial-update: Existing data that has the same primary key as the latest data is overwritten by the latest data in the non-null columns. Data in other columns remains unchanged. aggregation: An aggregate function is specified to perform pre-aggregation. |
partial-update.ignore-delete | Specifies whether to ignore delete messages when the merge-engine parameter is set to partial-update. | BOOLEAN | false | Valid values: true (delete messages are ignored), false (an error is reported if a delete message appears). Note: Whether delete messages need to be ignored depends on the actual scenario. You need to configure this parameter based on your business requirements. |
partition.default-name | The default name of the partition. | STRING | __DEFAULT_PARTITION__ | If the value of a partition key column is null or an empty string, the value of this parameter is used as the partition name. |
partition.expiration-check-interval | The interval at which the system checks partition expiration. | STRING | 1h | |
partition.expiration-time | The validity period of a partition. | STRING | No default value | If the period of time for which a partition exists exceeds the value of this parameter, the partition expires. By default, a partition never expires. The period of time for which a partition exists is calculated based on the value of the partition. |
partition.timestamp-formatter | The pattern that is used to convert a time string into a timestamp. | STRING | No default value | This parameter specifies the pattern that is used to extract the period of time for which a partition exists from the partition value. |
partition.timestamp-pattern | The pattern that is used to convert a partition value into a time string. | STRING | No default value | This parameter specifies the pattern that is used to extract the period of time for which a partition exists from the partition value. |
scan.bounded.watermark | The end condition for bounded streaming mode. | LONG | No default value | N/A. |
scan.mode | The consumer offset of the Apache Paimon source table. | STRING | default | |
scan.snapshot-id | The ID of the snapshot from which the Apache Paimon source table starts to consume data. | INTEGER | No default value | |
scan.timestamp-millis | The point in time from which the Apache Paimon source table starts to consume data. | INTEGER | No default value | |
snapshot.num-retained.max | The maximum number of the latest snapshots that can be retained. | INTEGER | 2147483647 | The snapshot expiration is triggered only if the condition specified by the snapshot.num-retained.max parameter or the snapshot.time-retained parameter is met. |
snapshot.num-retained.min | The minimum number of the latest snapshots that can be retained. | INTEGER | 10 | N/A. |
snapshot.time-retained | The duration for which snapshots can be retained. | STRING | 1h | The snapshot expiration is triggered only if the condition specified by the snapshot.num-retained.max parameter or the snapshot.time-retained parameter is met. |
write-mode | The write mode of the Apache Paimon table. | STRING | change-log | Valid values: change-log (based on the primary key) and append-only (only data insertion). change-log: Data is inserted into, deleted from, and updated in the Apache Paimon table based on the primary key. append-only: The Apache Paimon table allows only data insertion and does not support operations based on the primary key. This mode is more efficient than the change-log mode. |
scan.infer-parallelism | Specifies whether to automatically infer the degree of parallelism of the Apache Paimon source table. | BOOLEAN | false | Valid values true: The parallelism of the Apache Paimon source table is automatically inferred based on the number of buckets. false: The default degree of parallelism that is configured based on Ververica Platform (VVP) is used. If the resource configuration is in expert mode, the degree of parallelism that is configured is used. |
scan.parallelism | The degree of parallelism of the Apache Paimon source table. | INTEGER | No default value | N/A. |
sink.parallelism | The degree of parallelism of the Apache Paimon result table. | INTEGER | No default value | N/A. |
For more information about the configuration items, see Apache Paimon Documentation.
Feature Overview
Assuring Data Freshness and Consistency
The Apache Paimon result table commits data using the two-phase commit protocol (2PC) every time a Flink deployment triggers a checkpoint. This ensures that the data’s freshness corresponds to the Flink deployment’s checkpoint interval. During each data commit, up to two snapshots may be created.
In scenarios where two separate Flink deployments concurrently write to an Apache Paimon table, but target different buckets, serializable consistency is maintained. However, if they target the same bucket, only snapshot-level consistency is maintained. This might lead to the table reflecting the results from both deployments, but the assurance is that no data will go missing.
Data Merging Mechanism
If an Apache Paimon result table detects multiple records with identical primary keys, it combines these records into a singular entry to maintain primary key uniqueness. This merging strategy can be dictated by the merge-engine
parameter. A subsequent section delves deeper into the various available data merging strategies.
Mechanism | Description |
---|---|
deduplicate | This is the default value of the merge-engine parameter. If the data merging mechanism is deduplicate and multiple data records have the same primary key, the Apache Paimon result table retains only the latest data record and discards other data records. Note: If the latest data record is a delete message, all the data records that have the same primary key are discarded. |
partial-update | The partial-update mechanism allows you to use multiple messages to update data and finally obtain complete data. New data that has the same primary key as the existing data overwrites the existing data. Columns that have NULL values cannot overwrite existing data. For example, the Apache Paimon result table receives the following data records in sequence: <1, 23.0, 10, NULL> <1, NULL, NULL, ‘This is a book’> <1, 25.2, NULL, NULL> If the first column is the primary key, the final result is <1, 25.2, 10, ‘This is a book’> . Note: If you want the Apache Paimon result table to read the result that is obtained by using the partial-update mechanism in streaming mode, you must set the changelog-producer parameter to lookup or full-compaction. When you use the partial-update mechanism, delete messages cannot be processed. You can set the partial-update.ignore-delete parameter to true to ignore delete messages. |
aggregation | In specific scenarios, you may focus only on the aggregated values. The aggregation mechanism can aggregate data that has the same primary key based on the specified aggregate function. You must use fields.<field-name>.aggregate-function to specify an aggregate function for each column that is not the primary key. If you do not specify an aggregate function for a column that is not the primary key, the column uses the last_non_null_value aggregate function by default. The following sample code provides an example: |
CREATE TABLE MyTable (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.price.aggregate-function' = 'max',
'fields.sales.aggregate-function' = 'sum'
);
In this example, data in the price column is aggregated based on the max function, and data in the sales column is aggregated based on the sum function. If the input data records are <1, 23.0, 15>
and <1, 30.2, 20>
, the result is <1, 30.2, 35>
. Mappings between the supported aggregate functions and data types:
- sum: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE
- min and max: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ
- last_value and last_non_null_value: all data types
- listagg: STRING
- bool_and and bool_or: BOOLEAN
Only the sum function supports data retraction and deletion. If you want specific columns to ignore retraction and deletion messages, you can configure fields.${field_name}.ignore-retract'='true'
. If you want the Apache Paimon result table to read the aggregation result in streaming mode, you must set the changelog-producer parameter to lookup or full-compaction.
Mechanism for Producing Incremental Data
The method for producing incremental data is determined by the changelog-producer parameter. Apache Paimon has the capability to produce comprehensive incremental data from any given input stream. Every UPDATE_AFTER data entry has a matching UPDATE_BEFORE record. A detailed list of all the mechanisms for generating incremental data can be found below. For an in-depth understanding, refer to the official Apache Paimon documentation.
Mechanism | Description |
---|---|
none | This is the default value of the changelog-producer parameter. If you use the default value, the Apache Paimon source table of the downstream consumer can obtain only the latest situation of data when specific data records have the same primary key. In this case, the downstream consumer cannot learn the complete incremental data to effectively calculate data. The downstream consumer can view the latest data and determine whether existing data is deleted, but cannot learn more information about the deleted data. For example, if the downstream consumer wants to calculate the sum of a column and the consumer obtains only the latest value 5, the downstream consumer cannot determine how to update the sum. If the original value is 4, the sum should be increased by 1. If the original value is 6, the sum should be decreased by 1. This type of consumer is sensitive to UPDATE_BEFORE data. For this type of consumer, Ververica recommends that you do not set the changelog-producer parameter to none. However, other incremental data generation mechanisms may cause performance loss. Note: If your downstream consumer is a database that is not sensitive to UPDATE_BEFORE data, you can set the changelog-producer parameter to none. Ververica recommends that you configure this parameter based on your business requirements. |
input | If you set the changelog-producer parameter to input, the Apache Paimon result table writes input data streams to an incremental data file as incremental data in dual-write mode. Therefore, this incremental data generation mechanism can be used only when the input data streams, such as Change Data Capture (CDC) data, are complete. |
lookup | If you set the changelog-producer parameter to lookup, the Apache Paimon result table uses a point query mechanism that is similar to a dimension table to generate complete incremental data that corresponds to the snapshot before commit snapshot is performed. The incremental data generation mechanism allows the Apache Paimon result table to generate complete incremental data regardless of whether the input incremental data is complete. The lookup mechanism is more efficient than the full-compaction mechanism in the generation of incremental data. However, the lookup mechanism consumes more resources. Ververica recommends that you use the lookup mechanism in scenarios where the requirement for the freshness of incremental data is high. For example, incremental data within minutes is required. |
full-compaction | If you set the changelog-producer parameter to full-compaction, the Apache Paimon result table generates complete incremental data each time full-compaction is performed. The incremental data generation mechanism allows the Apache Paimon result table to generate complete incremental data regardless of whether the input incremental data is complete. The time interval at which full-compaction is performed is specified by the full-compaction.delta-commits parameter. Compared with the lookup mechanism, the full-compaction mechanism is less efficient in generating incremental data. However, the full-compaction mechanism does not cause additional computations based on the full-compaction process of data. Therefore, fewer resources are consumed. Ververica recommends that you use the full-compaction mechanism in scenarios where the requirement for the freshness of incremental data is not high. For example, incremental data within hours is required. |
Write Mode
The table below outlines the write modes that Apache Paimon tables can accommodate.
Mode | Description |
---|---|
Change-log | Change-log is the default write mode for Apache Paimon tables. In change-log mode, data can be inserted into, deleted from, and updated in an Apache Paimon table based on the primary key of the table. In change-log mode, you can also use the data merging mechanism and incremental data generation mechanism. |
Append-only | In append-only mode, the Apache Paimon table allows only data insertion and does not support operations based on the primary key. The append-only mode is more efficient than the change-log mode. In append-only mode, an Apache Paimon table can be used as a substitute of Message Queue in scenarios where the data freshness requirement is not high. For example, data within hours is required. For more information about the append-only mode, see Apache Paimon official documentation. When you use the append-only mode, take note of the following points: Ververica recommends that you configure the bucket-key parameter based on your business requirements. Otherwise, data of the Apache Paimon table is distributed to buckets based on the values of all columns. This results in low computing efficiency. The append-only mode ensures the data generation order based on the following rules: If two data records come from different partitions and the scan.plan-sort-partition parameter is configured, data that has the smaller partition value is preferentially generated. Otherwise, data from the partition that is created earlier is preferentially generated. If two data records come from the same bucket in the same partition, the data that is written earlier is preferentially generated. If two data records come from different buckets in the same partition, the data generation order cannot be ensured because data in different buckets is processed in different parallel subtasks. |
Some information on this page is derived from the official Apache Paimon documentation.
Refer to the Credits page for more information.