Apache Upsert Kafka
2 min read
On this page
This article introduces you how to use the Upsert Kafka connector.
Background Information
Upsert Kafka connector supports reading data from Kafka topic and writing data to Kafka topic in upsert mode.
- As a source table, upsert-kafka can convert the data stored in Kafka into a changelog stream, where each data record represents an update or delete event. More precisely, a value in a data record is interpreted as an UPDATE of the last value of the same key, if the key exists, and if the corresponding key does not exist, the update is treated as an INSERT. Using a table analogy, data records in the changelog stream are interpreted as UPSERTs, also known as INSERTs or UPDATEs, since any existing rows with the same key are overwritten. In addition, a message with an empty value will be treated as a DELETE message.
- As a result table, the upsert-kafka connector can consume the changelog stream generated by the upstream computing logic. It will write INSERT or UPDATE_AFTER data as a normal Kafka message, and write DELETE data as a Kafka message with an empty value, indicating that the message corresponding to the key is deleted. Flink will partition the data according to the value of the primary key column, so as to ensure that the messages on the primary key are ordered, so update or delete messages on the same primary key will fall in the same partition.
Prerequisite
- The Kafka cluster has been created.
- The network between the real-time computing Flink and Kafka clusters is connected.
Usage Restrictions
- Only supports reading and writing data of Apache Kafka 0.10 and above.
- Only Apache Kafka version 2.8 client configuration items are supported..
- When the Kafka result table uses exactly-once semantics, the Kafka cluster to be written must have the transaction function enabled, and only Apache Kafka 0.11 and above clusters are supported.
Grammatical Structures
SQL
1 CREATE TABLE upsert_kafka_sink(
2 user_region STRING,
3 pv BIGINT,
4 uv BIGINT,
5 PRIMARYKEY (user_region) NOT ENFORCED
6 ) WITH(
7 'connector'='upsert-kafka',
8 'topic'='<yourTopicName>',
9 'properties.bootstrap.servers'= '...',
10 'key.format'='avro','value.format'='avro');WITH Parameter
Universal
Result table
Example of Use
Source Table
- Create a source table that counts PV and UV.
SQL
1 CREATE TABLE pageviews(
2 user_id BIGINT,
3 page_id BIGINT,
4 viewtime TIMESTAMP,
5 user_region STRING,
6 WATERMARKFOR viewtime AS viewtime - INTERVAL '2' SECOND
7 ) WITH('connector'='kafka','topic'='<yourTopicName>','properties. bootstrap.servers'='...','format'='json');- Create an Upsert Kafka result table
SQL
1 CREATE TABLE pageviews_per_region(
2 user_region STRING,pv BIGINT,uv BIGINT,PRIMARYKEY(user_region)NOTENFORCED) WITH('connector'='upsert-kafka','topic'='<yourTopicName>','properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro');- Write statistical PV and UV data to the results table.
SQL
1 SELECT
2 user_region, COUNT(*), COUNT(DISTINCT user_id)
3 FROM pageviews
4 GROUP BY user_region;Result Table
- Create a source table that counts PV and UV.
SQL
1 CREATE TABLE pageviews(
2 user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime-INTERVAL'2'SECOND) WITH('connector'='kafka','topic'='<yourTopicName>','properties.bootstrap.servers'='... ','format'='json');- Create an Upsert Kafka result table.
SQL
1 CREATE TABLE pageviews_per_region(
2 user_region STRING,pv BIGINT,uv BIGINT,PRIMARYKEY(user_region) NOTENFORCED) WITH('connector'='upsert-kafka','topic'='<yourTopicName>','properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro');- Write statistical PV and UV data to the results table.
SQL
1 INSERT INTO page views_per_region
2 SELECT user_region,
3 COUNT(*),
4 COUNT(DISTINCTuser_id)
5 FROM page views
6 GROUP BY user_region;Was this helpful?