Apache Upsert Kafka
This article introduces you how to use the Upsert Kafka connector.
Background information
Upsert Kafka connector supports reading data from Kafka topic and writing data to Kafka topic in upsert mode.
- As a source table, upsert-kafka can convert the data stored in Kafka into a changelog stream, where each data record represents an update or delete event. More precisely, a value in a data record is interpreted as an UPDATE of the last value of the same key, if the key exists, and if the corresponding key does not exist, the update is treated as an INSERT. Using a table analogy, data records in the changelog stream are interpreted as UPSERTs, also known as INSERTs or UPDATEs, since any existing rows with the same key are overwritten. In addition, a message with an empty value will be treated as a DELETE message.
- As a result table, the upsert-kafka connector can consume the changelog stream generated by the upstream computing logic. It will write INSERT or UPDATE_AFTER data as a normal Kafka message, and write DELETE data as a Kafka message with an empty value, indicating that the message corresponding to the key is deleted. Flink will partition the data according to the value of the primary key column, so as to ensure that the messages on the primary key are ordered, so update or delete messages on the same primary key will fall in the same partition.
Category | Description |
---|---|
Support type | Source table, Result table |
Operating mode | Stream mode and batch mode |
Data Format | avro, avro-confluent, canal-json, csv, debezium-json, json, maxwell-json, and raw |
Monitoring indicators | Source table: numRecordsIn , numRecordsInPerSecond , numBytesIn , numBytesInPerScond , currentEmitEventTimeLag , currentFetchEventTimeLag , sourceIdleTime , pendingRecords . Result table: numRecordsOut , numRecordsOutPerSecond , numBytesOut , numBytesOutPerSecond , currentSendTime |
Types of APIs | SQL |
Prerequisite
- The Kafka cluster has been created.
- The network between the real-time computing Flink and Kafka clusters is connected.
Usage restrictions
- Only supports reading and writing data of Apache Kafka 0.10 and above.
- Only Apache Kafka version 2.8 client configuration items are supported..
- When the Kafka result table uses exactly-once semantics, the Kafka cluster to be written must have the transaction function enabled, and only Apache Kafka 0.11 and above clusters are supported.
Grammatical structures
CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARYKEY (user_region) NOT ENFORCED
) WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'= '...',
'key.format'='avro','value.format'='avro');
WITH parameter
Universal
Parameter | Description | Type of data | Required | Defaults | Additional info |
---|---|---|---|---|---|
connector | table type. | String | yes | none | The fixed value is kafka. |
properties.bootstrap.servers | Kafka broker address. | String | yes | none | The format is host:port,host:port,host:port , separated by commas (,). |
properties | Direct configuration of Kafka clients. | String | no | none | The producer and consumer configuration defined in the official Kafka documentation. Flink will remove the properties. prefix and pass the remaining configuration to the Kafka client. For example, you can disable automatic topic creation by passing ‘properties.allow.auto.create.topics’ = ‘false’.The following configurations cannot be modified this way as they will be overwritten by the Kafka connector: key. deserializer , value. deserializer |
format | The format to use when reading or writing the value part of a Kafka message . | String | no | none | Supported formats: csv , json , avro , debezium-json , canal-json , maxwell-json , avro-confluent , raw , |
key.format | The format to use when reading or writing the key part of a Kafka message. | String | no | none | When using this configuration, the key.options configuration is required. Supported formats: csv , json , avro , debezium-json , canal-json , maxwell-json , avro-confluent , raw . |
key.fields | The source table or result table field corresponding to the key part of the Kafka message. | String | no | none | Multiple field names are separated by a semicolon (;). For example field1;field2 |
key.fields-prefix | Specify a custom prefix for the key part of all Kafka messages to avoid the same name as the format field of the value part of the message. | String | no | none | This configuration item is only used to distinguish the column names of the source table and the result table. When parsing and generating the key part of the Kafka message, the prefix will be removed. Note that when using this configuration, value.fields-include must be configured as EXCEPT_KEY. |
value. format | The format to use when reading or writing the value part of a Kafka message. | String | no | none | When using this configuration, the key.options configuration is required. This configuration is equivalent to format, so only one of format and value.format can be configured, and conflicts will occur if both are configured at the same time. |
value.fields-include | Whether to include the field corresponding to the key part of the message when parsing or generating the value part of the Kafka message. | String | no | ALL | The parameter values are as follows: ALL (default value): All columns will be processed as the value part of the Kafka message. EXCEPT_KEY: Remove the fields defined by key.fields, and the remaining fields are processed as the value part of the Kafka message. |
topic | Read or write topic names. | String | yes | none | none. |
Result table
Parameter | Description | Type of data | Required | Defaults | Additional info |
---|---|---|---|---|---|
sink.parallelism | Concurrent number of Kafka result table operators. | Integer | no | The concurrency of upstream operators is determined by the framework | none. |
sink.buffer-flush.max-rows | The maximum number of records that can be cached before the cache is refreshed. | Integer | no | Unopened | When the result table receives many updates on the same key, the cache will keep the last record of the same key, so the result table cache can help reduce the amount of data sent to the Kafka topic and avoid sending potential tombstone messages. It can be disabled by setting to 0. By default, this option is not enabled. Note that if you want to enable result table caching, you need to set both sink.buffer-flush.max-rows and sink.buffer-flush.interval to values greater than zero. |
sink.buffer-flush.interval | Interval between cache refreshes. | Duration | no | Unopened | When the result table receives many updates on the same key, the cache will keep the last record of the same key, so the result table cache can help reduce the amount of data sent to the Kafka topic and avoid sending potential tombstone messages. It can be disabled by setting to 0. By default, this option is not enabled. Note that if you want to enable result table caching, you need to set both sink.buffer-flush.max-rows and sink.buffer-flush.interval to values greater than zero. |
Example of use
Source table
- Create a source table that counts PV and UV.
CREATE TABLE pageviews(
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP,
user_region STRING,
WATERMARKFOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH('connector'='kafka','topic'='<yourTopicName>','properties. bootstrap.servers'='...','format'='json');
- Create an Upsert Kafka result table
CREATE TABLE pageviews_per_region(
user_region STRING,pv BIGINT,uv BIGINT,PRIMARYKEY(user_region)NOTENFORCED) WITH('connector'='upsert-kafka','topic'='<yourTopicName>','properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro');
- Write statistical PV and UV data to the results table.
SELECT
user_region, COUNT(*), COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;
Result table
- Create a source table that counts PV and UV.
CREATE TABLE pageviews(
user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime-INTERVAL'2'SECOND) WITH('connector'='kafka','topic'='<yourTopicName>','properties.bootstrap.servers'='... ','format'='json');
- Create an Upsert Kafka result table.
CREATE TABLE pageviews_per_region(
user_region STRING,pv BIGINT,uv BIGINT,PRIMARYKEY(user_region) NOTENFORCED) WITH('connector'='upsert-kafka','topic'='<yourTopicName>','properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro');
- Write statistical PV and UV data to the results table.
INSERT INTO page views_per_region
SELECT user_region,
COUNT(*),
COUNT(DISTINCTuser_id)
FROM page views
GROUP BY user_region;
note
This page is derived from the official Apache Flink® documentation.
Refer to the Credits page for more information.