Skip to main content

Apache Upsert Kafka

This article introduces you how to use the Upsert Kafka connector.

Background information

Upsert Kafka connector supports reading data from Kafka topic and writing data to Kafka topic in upsert mode.

  • As a source table, upsert-kafka can convert the data stored in Kafka into a changelog stream, where each data record represents an update or delete event. More precisely, a value in a data record is interpreted as an UPDATE of the last value of the same key, if the key exists, and if the corresponding key does not exist, the update is treated as an INSERT. Using a table analogy, data records in the changelog stream are interpreted as UPSERTs, also known as INSERTs or UPDATEs, since any existing rows with the same key are overwritten. In addition, a message with an empty value will be treated as a DELETE message.
  • As a result table, the upsert-kafka connector can consume the changelog stream generated by the upstream computing logic. It will write INSERT or UPDATE_AFTER data as a normal Kafka message, and write DELETE data as a Kafka message with an empty value, indicating that the message corresponding to the key is deleted. Flink will partition the data according to the value of the primary key column, so as to ensure that the messages on the primary key are ordered, so update or delete messages on the same primary key will fall in the same partition.
CategoryDescription
Support typeSource table, Result table
Operating modeStream mode and batch mode
Data Formatavro, avro-confluent, canal-json, csv, debezium-json, json, maxwell-json, and raw
Monitoring indicatorsSource table: numRecordsIn, numRecordsInPerSecond, numBytesIn, numBytesInPerScond, currentEmitEventTimeLag, currentFetchEventTimeLag, sourceIdleTime, pendingRecords. Result table: numRecordsOut, numRecordsOutPerSecond, numBytesOut, numBytesOutPerSecond, currentSendTime
Types of APIsSQL

Prerequisite

  • The Kafka cluster has been created.
  • The network between the real-time computing Flink and Kafka clusters is connected.

Usage restrictions

  • Only supports reading and writing data of Apache Kafka 0.10 and above.
  • Only Apache Kafka version 2.8 client configuration items are supported..
  • When the Kafka result table uses exactly-once semantics, the Kafka cluster to be written must have the transaction function enabled, and only Apache Kafka 0.11 and above clusters are supported.

Grammatical structures

    CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARYKEY (user_region) NOT ENFORCED
) WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'= '...',
'key.format'='avro','value.format'='avro');

WITH parameter

Universal

ParameterDescriptionType of dataRequiredDefaultsAdditional info
connectortable type.StringyesnoneThe fixed value is kafka.
properties.bootstrap.serversKafka broker address.StringyesnoneThe format is host:port,host:port,host:port , separated by commas (,).
propertiesDirect configuration of Kafka clients.StringnononeThe producer and consumer configuration defined in the official Kafka documentation. Flink will remove the properties. prefix and pass the remaining configuration to the Kafka client. For example, you can disable automatic topic creation by passing ‘properties.allow.auto.create.topics’ = ‘false’.The following configurations cannot be modified this way as they will be overwritten by the Kafka connector: key. deserializer, value. deserializer
formatThe format to use when reading or writing the value part of a Kafka message .StringnononeSupported formats: csv, json, avro, debezium-json, canal-json, maxwell-json, avro-confluent, raw,
key.formatThe format to use when reading or writing the key part of a Kafka message.StringnononeWhen using this configuration, the key.options configuration is required. Supported formats: csv, json, avro, debezium-json, canal-json, maxwell-json, avro-confluent, raw.
key.fieldsThe source table or result table field corresponding to the key part of the Kafka message.StringnononeMultiple field names are separated by a semicolon (;). For example field1;field2
key.fields-prefixSpecify a custom prefix for the key part of all Kafka messages to avoid the same name as the format field of the value part of the message.StringnononeThis configuration item is only used to distinguish the column names of the source table and the result table. When parsing and generating the key part of the Kafka message, the prefix will be removed. Note that when using this configuration, value.fields-include must be configured as EXCEPT_KEY.
value. formatThe format to use when reading or writing the value part of a Kafka message.StringnononeWhen using this configuration, the key.options configuration is required. This configuration is equivalent to format, so only one of format and value.format can be configured, and conflicts will occur if both are configured at the same time.
value.fields-includeWhether to include the field corresponding to the key part of the message when parsing or generating the value part of the Kafka message.StringnoALLThe parameter values are as follows: ALL (default value): All columns will be processed as the value part of the Kafka message. EXCEPT_KEY: Remove the fields defined by key.fields, and the remaining fields are processed as the value part of the Kafka message.
topicRead or write topic names.Stringyesnonenone.

Result table

ParameterDescriptionType of dataRequiredDefaultsAdditional info
sink.parallelismConcurrent number of Kafka result table operators.IntegernoThe concurrency of upstream operators is determined by the frameworknone.
sink.buffer-flush.max-rowsThe maximum number of records that can be cached before the cache is refreshed.IntegernoUnopenedWhen the result table receives many updates on the same key, the cache will keep the last record of the same key, so the result table cache can help reduce the amount of data sent to the Kafka topic and avoid sending potential tombstone messages. It can be disabled by setting to 0. By default, this option is not enabled. Note that if you want to enable result table caching, you need to set both sink.buffer-flush.max-rows and sink.buffer-flush.interval to values greater than zero.
sink.buffer-flush.intervalInterval between cache refreshes.DurationnoUnopenedWhen the result table receives many updates on the same key, the cache will keep the last record of the same key, so the result table cache can help reduce the amount of data sent to the Kafka topic and avoid sending potential tombstone messages. It can be disabled by setting to 0. By default, this option is not enabled. Note that if you want to enable result table caching, you need to set both sink.buffer-flush.max-rows and sink.buffer-flush.interval to values greater than zero.

Example of use

Source table

  • Create a source table that counts PV and UV.
    CREATE TABLE pageviews(
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP,
user_region STRING,
WATERMARKFOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH('connector'='kafka','topic'='<yourTopicName>','properties. bootstrap.servers'='...','format'='json');
  • Create an Upsert Kafka result table
    CREATE TABLE pageviews_per_region(
user_region STRING,pv BIGINT,uv BIGINT,PRIMARYKEY(user_region)NOTENFORCED) WITH('connector'='upsert-kafka','topic'='<yourTopicName>','properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro');
  • Write statistical PV and UV data to the results table.
    SELECT
user_region, COUNT(*), COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

Result table

  • Create a source table that counts PV and UV.
    CREATE TABLE pageviews(
user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime-INTERVAL'2'SECOND) WITH('connector'='kafka','topic'='<yourTopicName>','properties.bootstrap.servers'='... ','format'='json');
  • Create an Upsert Kafka result table.
    CREATE TABLE pageviews_per_region(
user_region STRING,pv BIGINT,uv BIGINT,PRIMARYKEY(user_region) NOTENFORCED) WITH('connector'='upsert-kafka','topic'='<yourTopicName>','properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro');
  • Write statistical PV and UV data to the results table.
    INSERT INTO page views_per_region
SELECT user_region,
COUNT(*),
COUNT(DISTINCTuser_id)
FROM page views
GROUP BY user_region;
note

This page is derived from the official Apache Flink® documentation.

Refer to the Credits page for more information.