Skip to main content

Apache Kafka

Background Information

Apache Kafka is an open source distributed message queuing system, which is widely used in big data fields such as high-performance data processing, stream analysis, and data integration. Based on the open source Apache Kafka client, the Kafka connector provides high-performance data throughput, read and write of multiple data formats.

CategoryDescription
Supported modeStreaming mode
Supported formatsCSV, JSON, Apache Avro, Confluent Avro, Debezium JSON, Canal JSON, Maxwell JSON, Raw
Supported indicatorsSource: numRecords, numRecordsInPerSecond, numBytes PerScond, currentEmitEventTimeLag, currentFetchEventTimeLag, sourceIdleTime, pendingRecords. Sink: numRecords, numRecordsOutPerSecond, numBytesOutPerSecond, currentSendTime.

Connect to Self-built Apache Kafka Cluster

  • Self-built Apache Kafka cluster version 0.11 and above
  • The network between Flink and the self-built Apache Kafka cluster has been opened.

Network Connection Troubleshooting

If your Flink job has a “Timed out waiting for a node assignment” error when starting, it is usually caused by a network connectivity issue between Flink and Kafka.

The process of establishing a connection between the Kafka client and the server is as follows:

  1. The client uses the bootstrap.servers address specified by the user to connect to the Kafka server, and the Kafka server returns the meta information of each broker in the cluster to the client according to the configuration, including the connection address of each broker.
  2. The client uses the connection address returned by the broker in the first step to connect to each broker for reading/writing.

If the Kafka server is not configured correctly, the connection address received by the client in the first step is wrong, even if the address configured in bootstrap.servers can be connected, it cannot read/write data normally. This problem often occurs when there are network forwarding mechanisms such as proxy, port forwarding, and dedicated lines between Flink and Kafka.

Usage Restrictions

Version and configuration

  • Only the Flink computing engine VERA 1.0.3 and above supports the message queue Kafka Connector.
  • Only supports reading and writing data of Apache Kafka 0.11 and above.
  • Only Apache Kafka version 2.8 client configuration items are supported. For details, see the Apache Kafka consumer and producer configuration item documentation.
  • When the Kafka result table (sink) uses exactly-once semantics, the Kafka cluster to be written must enable the transaction function, and only Apache Kafka 0.11 and above clusters are supported.

Use of CREATE TABLE AS (CTAS)

Only the Flink computing engine vera-1.0.3-flink-1.13 and above supports Kafka as the synchronization data source of CREATE TABLE AS (CTAS). Only supports type deduction and schema changes in JSON format, other data formats are not supported for now. When using Kafka as the data source of the CREATE TABLE AS (CTAS) statement, it only supports synchronization schema changes to Hudi and Hologres result tables. Only the type deduction and table structure change of the value part in Kafka are supported. If you need to synchronize the columns of the Kafka key part, you need to manually specify it in the DDL. For details, see Example 3.

Grammatical Structures

    CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA VIRTUAL FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

Meta Info Column

You can define meta information columns in source and target tables to get or write meta information of Kafka messages. For example, when multiple topics are defined in the WITH parameter, if the meta information column is defined in the Kafka source table, the data read by Flink will be identified from which topic the data is read from. Examples of usage of meta information columns are as follows:

    CREATE TABLE kafka_source (
-- read the topic to which the message belongs as `record_topic` field
`record_topic` STRING NOT NULL METADATA VIRTUAL FROM 'topic',
-- read the timestamp in ConsumerRecord as `ts` field
`ts` TIMESTAMP(3 ) METADATA VIRTUAL FROM 'timestamp',
-- Read message offset as `record_offset` field
`record_offset` BIGINT NOT NULL METADATA VIRTUAL FROM 'offset',
...
) WITH (
'connector' = 'kafka',
...
);

CREATE TABLE kafka_sink (
-- Write the timestamp in the `ts` field as the ProducerRecord timestamp to Kafka
`ts` TIMESTAMP(3) METADATA VIRTUAL FROM 'timestamp',
...
) WITH (
'connector' = 'kafka' ,
...
);

The following table lists the metadata columns supported by Kafka source and target tables:

CategoryType of dataDescriptionSource table/Target table
topicSTRING NOT NULL METADATA VIRTUALThe name of the topic where the Kafka message resides.source table
partitionINT NOT NULL METADATA VIRTUALPartition ID where the Kafka message resides.source table
headersMAP<STRING, BYTES> NOT NULL METADATA VIRTUALThe header of the Kafka message.source table and target table
leader-epochINT NOT NULL METADATA VIRTUALLeader epoch of Kafka messages.source table
offsetBIGINT NOT NULL METADATA VIRTUALThe offset (offset) of the Kafka message.source table
timestampTIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUALThe timestamp of the Kafka message.source table and target table
timestamp-typeSTRING NOT NULL METADATA VIRTUALTimestamp type of: 1. Kafka message: NoTimestampType: No timestamp is defined in the message. 2. CreateTime: The time when the message was generated. 3. LogAppendTime: The time when the message was added to Kafka Broker.source table

WITH Parameter

Universal type

ParameterDescriptionType of dataRequiredDefaultsAdditional info
connectorconnector typeStringyesnoneThe fixed value is kafka.
properties.bootstrap.serversKafka broker addressStringyesnoneThe format is host:port,host:port,host:port, separated by commas (,).
properties.*Direct configuration of Kafka clientsStringnononethe producer and consumer configuration defined in the official Kafka documentation. Flink will remove the properties. prefix and pass the remaining configuration to the Kafka client. For example, you can disable automatic topic creation by passing ‘properties.allow.auto.create.topics’ = ‘false’. The following configurations cannot be modified this way as they will be overwritten by the Kafka connector: key. deserializer value. deserializer
formatThe format to use when reading or writing the value part of a Kafka message.StringnononeSupported formats: csv, json, avro, debezium-json, canal-json, maxwell-json, avro-confluent, raw
key.formatThe format to use when reading or writing the key part of a Kafka message.StringnononeSupported formats: csv, json, avro, debezium-json, canal-json, maxwell-json, avro-confluent, raw Note: When using this configuration, the “key.options” configuration is required.
key.fieldsThe source table/result table field corresponding to the key part of the Kafka message.StringnononeMultiple field names are separated by a semicolon (;). For example, field1;field2
key.fields-prefixSpecify a custom prefix for all Kafka message key parts to avoid duplication with message value part format fields.StringnononeThis configuration item is only used to distinguish the column names of the source table and the result table. When parsing and generating the key part of the Kafka message, the prefix will be removed. Note: When using this configuration, “value.fields-include” must be configured as EXCEPT_KEY.
value. formatThe format to use when reading or writing the value part of a Kafka message.StringnononeNote: When using this configuration, the “key.options” configuration is required. This configuration is equivalent to “format”, so format and value.format can only configure one of them. If you configure both at the same time, there will be conflicts.
value.fields-includeWhen parsing or generating the value part of a Kafka message, whether to include the field corresponding to the key part of the message.StringnoALLThe values are as follows: ALL (default value): all columns will be processed as Kafka message value part EXCEPT_KEY: Remove the fields defined by key.fields, and the remaining fields are processed as the value part of the Kafka message

Source type

ParameterDescriptionType of dataRequiredDefaultsAdditional info
topicThe topic name to readStringnononeSeparate multiple topic names with a semicolon (;), such as topic-1;topic-2 Note: Only one of the two options topic and topic-pattern can be specified.
topic-patternA regular expression that matches the read topic name. All topics matching this regular expression will be read when the job runs.StringnononeNote: Only VERA 1.0.3 and later versions support this parameter. Only one of the two options topic and topic-pattern can be specified.
properties.group.idconsumer group IDStringnoKafkaSource-{source table name}If the specified group id is first used, properties.auto.offset.reset must be set to earliest or latest to specify the first start position.
scan.startup.modeStart point for Kafka to read dataStringnogroup-offsetsThe values are as follows: earliest-offset: read from the earliest Kafka partition. latest-offset: Start reading from the latest position in Kafka. group-offsets (default): Start reading from the committed position of the specified properties.group.id. timestamp: Start reading from the timestamp specified by scan.startup.timestamp-millis. specific-offsets: start reading from the offset specified by scan.startup.specific-offsets.
scan.startup.specific-offsetsIn the specific-offsets boot mode, specify the boot offsets for each partition.StringnononeFor example: partition:0,offset:42;partition:1,offset:300
scan.startup.timestamp-millisIn timestamp startup mode, specify the startup site timestamp.Longnononein milliseconds
scan.topic-partition-discovery.intervalDynamically detect the time interval of Kafka topic and partition.DurationnononeTopic / partition dynamic detection is not enabled by default.
ParameterDescriptionType of dataRequiredDefaultsAdditional info
ParameterDescriptionType of dataRequiredDefaultsAdditional info
ParameterDescriptionType of dataRequiredDefaultsAdditional info

Sink Type

ParameterDescriptionType of dataRequiredDefaultsAdditional info
topicThe topic name to writeStringnodefault
sink.partitionerMapping mode from Flink concurrently to Kafka partitionStringnodefaultThe values are as follows: default (default value): use Kafka’s default partition mode fixed: Each Flink concurrently corresponds to a fixed Kafka partition. round-robin: The data in Flink concurrency will be allocated to each partition of Kafka in turn. Custom partition mapping mode: If fixed and round-robin do not meet your needs, you can create a subclass of FlinkKafkaPartitioner to customize the partition mapping mode. For example org.mycompany.MyPartitioner
sink.delivery-guaranteeSemantic schema for Kafka sinkStringnoat-least-onceThe values are as follows: none: no semantics are guaranteed, data may be lost or duplicated; at-least-once (default): data is guaranteed not to be lost, but may be duplicated; exactly-once: use Kafka transactions to ensure that data will not be lost and duplicated Note: “sink.transactional-id-prefix” is required when using exactly-once semantics
sink.transactional-id-prefixKafka transaction ID prefix used under exactly-once semantics.StringnononeNote This configuration will only take effect if “sink.delivery-guarantee” is configured as “exactly-once”
sink.parallelismConcurrency of Kafka sink operatorIntegernoThe concurrency of upstream operators is determined by the framework.

CTAS Sync Data Source Type

ParameterDescriptionType of dataRequiredDefaultsAdditional info
json.infer-schema.flatten-nested-columns.enableWhether to recursively expand nested columns in JSONBooleannoFALSEThe parameter values are as follows: true: Recursive expansion. For columns that are expanded, Flink uses the path that indexes the value as the name. For example, for the column col in JSON {“nested”: {“col”: true}}, its expanded name is nested.col. false (default): Treat nested types as Strings.
json.infer-schema.primitive-as-stringWhether to deduce all basic types as String type.BooleannoFALSEThe parameter values are as follows: true: deduces all primitive types as String. false (default): Follow the ground rules for derivation.
ParameterDescriptionType of dataRequiredDefaultsAdditional info
ParameterDescriptionType of dataRequiredDefaultsAdditional info

All configuration items supported by Kafka consumer and producer can be used in the WITH parameter after adding the “properties.” prefix before the configuration. For example, if you need to configure the timeout “request.timeout.ms” of Kafka consumer or producer to 60000 milliseconds, you can configure ‘properties.request.timeout.ms’ = ‘60000’ in the WITH parameter. The configuration items of Kafka consumer and Kafka producer can be obtained in the official Apache Kafka documentation.

Message Format

Kafka source and result tables support the following message formats:

Each format has its corresponding configuration item, which can be directly used in the WITH parameter. Format configuration items can refer to the Flink community documentation corresponding to each format.

Security and Authentication

If your Kafka cluster requires secure connection or authentication, please add the relevant security and authentication configuration with the prefix of properties. and set it in the WITH parameter. Here is an example of how to configure a Kafka table to use PLAIN as the SASL mechanism and provide a JAAS configuration:

    CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security. protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\ "password=\"password\";'
)

Another more complex example uses SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism:

    CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security. protocol' = 'SASL_SSL',
/* SSL configuration*/
/* Configure the path of the truststore (CA certificate) provided by the server*/
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore .jks',
'properties.ssl.truststore.password' = 'test1234',
/* If client authentication is required, you need to configure the path of the keystore (private key) */
'properties.ssl.keystore.location' = '/ flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/* SASL configuration*/
/* Configure SASL mechanism as SCRAM-SHA-256 */
'properties .sasl.mechanism' = 'SCRAM-SHA-256',
/* Configure JAAS */
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username \"password=\"password\";'
)

The CA certificate and private key mentioned in the example can be uploaded to the platform using the resource upload function of VVP, and the uploaded files are stored in the ’/flink/usrlib’ directory. Assuming that the name of the CA certificate file to be referenced is my-truststore.jks, you can specify ‘properties.ssl.truststore.location’ = ‘/flink/usr lib/my-truststore.jks’ in the WITH parameter after uploading to use the Certificate.

Source Table Start Site

Boot mode The Kafka source table can specify the initial read position by configuring “scan.startup.mode”:

  • Earliest position (earliest-offset), read from the earliest position of the current partition
  • The last position (latest-offset), read from the last position of the current partition
  • Submitted site (group-offsets), read from the submitted site of the specified group id, the group id is specified by “properties.group.id”
  • Specify a timestamp (timestamp), read from the first message whose timestamp is greater than or equal to the specified time, and the timestamp is specified by “scan.startup.timestamp-millis”
  • Specific location (specific-offsets), start consumption from the partition location specified by the user, and the location is specified by “scan.startup.specific-offsets”. If the user does not specify the start point, consumption will be started from the committed point (group-offsets) by default. Please note that “scan.startup.mode” is only valid for stateless started jobs. When a stateful job starts, it will start consuming from the location stored in the state.

The code example is as follows:

    CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
-- start consumption from the earliest position
'scan.startup.mode' = 'earliest-offset',
-- from the last position Point start consuming
'scan.startup.mode' = 'latest-offset',
-- Start consuming from the committed site of consumer group "my-group"
'properties.group.id' = 'my-group',
' scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- if "my-group" is used for the first time, start consumption from the earliest position
'properties.auto .offset.reset' = 'latest', -- If "my-group" is used for the first time, start consumption from the last position
-- start consumption from the specified millisecond timestamp 1655395200000
'scan.startup.mode' = ' timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
-- start consumption from the specified location
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = ' partition:0,offset:42;partition:1,offset:300'
);

Start Site Priority

The priority of the source table start site is:

  1. Points stored in Checkpoint / Savepoint
  2. User-specified launch time on the VVP platform
  3. The startup point specified by the user through scan.startup.offset in the WITH parameter
  4. Use group-offsets when scan.startup.offset is not specified

In any of the above steps, if the site is invalid due to the expiration of the site or a problem with the Kafka cluster, the site will be reset using the strategy specified by properties.auto.offset.reset. If the user does not set this configuration item, An exception will be generated requiring user intervention.

A common case is to start consumption with a brand new group id. First, the source table will query the Kafka cluster for the submitted position of the group. Since the group id is used for the first time, no valid position will be queried, so it will be reset through the strategy configured by the properties.auto.offset.reset parameter. place. Therefore, when using a new group id for consumption, you must configure properties.auto.offset.reset to specify the location reset strategy.

Source Site Submission

The Kafka source table only submits the current consumption site to the Kafka cluster after the checkpoint succeeds. If your checkpoint interval is set longer, the consumption point you observe on the Kafka cluster side will be delayed. When performing a checkpoint, the Kafka source table will store the current read progress in the state, and does not rely on the site submitted to the cluster for fault recovery. The submitted site is only for monitoring the read progress on the Kafka side. Failure to submit a site will not have any impact on data accuracy.

Result Table Custom Partitioner

If the built-in Kafka Producer partition mode cannot meet your needs, you can implement a custom partition mode to write data to the corresponding partition. Custom partitioners need to inherit FlinkKafkaPartitioner. After the development is completed, compile the JAR package and upload it to the VVP platform using the resource upload function. After uploading and referencing, please set the sink.partitioner parameter in the WITH parameter, and the value is the complete class path of the partitioner, such as “org.mycompany.MyPartitioner”

Kafka / Upsert Kafka / Kafka JSON Catalog Selection

Kafka is a message queue system that can only add data, but cannot update and delete data. Therefore, it cannot handle upstream CDC change data and rollback logic of operators such as aggregation and union in streaming SQL computing. If you need to write data containing changes or rollbacks to Kafka, use the Upsert Kafka result table that handles changed data specially.

To facilitate users to batch-synchronize the changed data in one or more data tables in the upstream database to Kafka, you can use the Kafka JSON catalog. If the data format stored in your Kafka is JSON, using the Kafka JSON catalog can save the steps of defining schema and WITH parameters. See the Kafka JSON catalog documentation for details.

As a CTAS Data Source

The CTAS statement supports the message queue Kafka and the table format is JSON as the data source. During data synchronization, if some fields do not appear in the predefined table structure, Flink will try to automatically deduce the type of the column. If the automatically deduced type cannot meet your usage requirements, you can also declare the analytical type of some columns through assisted derivation. For a detailed description of JSON Format, see JSON Format documentation for details.

Type Inference

During the type deduction process, Flink will only expand the first layer of data in the JSON text by default, and perform type deduction according to the basic rules according to its type and value. Of course, you can also declare the analytical type of a specific column in DDL to meet your special needs, that is, auxiliary derivation.

Basic Rules

The basic rules of type mapping are shown in the table below.

JSON typeFlink SQL type
BOOLEANBOOLEAN
STRINGDATE, TIMESTAMP, TIMESTAMP_LTZ, TIME, or STRING
INT or LONGBIGINT
BIGINTDECIMAL or STRING. Explanation: Since the type of DECIMAL in Flink has precision limitations. Therefore, if the actual value of the integer exceeds the maximum precision of the DECIMAL type, Flink will automatically deduce its type as STRING to avoid loss of precision.
FLOAT, DOUBLE, or BIGDECIMALDOUBLE
ARRAYSTRING
OBJECTSTRING

Example for the following JSON text:

    {
"id": 101,
"name": "VVP",
"properties": {
"owner": "Ververica",
"engine": "Flink"
}
"type": ["Big Data"]
}

The table information written by Flink to the downstream storage:

idnamepropertiestype
101VVP[“Big Data”]

Auxiliary Derivation

If you feel that the above basic rules do not meet your actual needs, you can declare the resolution type of a specific column in the DDL of the source table. In this way, Flink will preferentially use the column type you declare to parse the target field. For the following example, Flink will use the DECIMAL method to parse the price field instead of using the default basic rules to convert it to the DOUBLE type.

    CREATE TABLE evolvingKafkaSource (
price DECIMAL(18, 2)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'localhost:9092',
'topic' = 'evolving_kafka_demo',
'scan.startup.mode ' = 'earliest-offset',
'format' = 'json'
);

However, if the type you specify in the DDL is inconsistent with the type in the actual data, you can handle it as follows:

  • When the declared type is wider than the actual type, the declared type is automatically resolved. For example, if the declaration is DOUBLE and the encountered data type is BIGINT, it will be parsed as DOUBLE.
  • When the actual type is wider than the declared type or the two types are incompatible, because current CTAS does not support type changes, an error will be reported to prompt you for relevant information. You need to restart the job and declare the exact type to parse the data.

Auxiliary Derivation of Complex Types

  • Auxiliary derivation of complex types is not supported, including ROW, ARRAY, MAP, and MULTISET. For complex types, Flink handles them as STRING by default. Typically, JSON text in a Kafka topic has a nested structure. If you need to extract nested columns in JSON text, there are two ways:

  • Declare ‘json.infer-schema.flatten-nested-columns.enable’=‘true’ in the source table DDL to flatten all elements in nested columns to the top level. In this way, all nested columns will be expanded sequentially. In order to avoid column name conflicts, Flink uses the path from the index to the column as the expanded column name.

Column Name Conflicts

  • Resolving column name conflicts is not currently supported. If a column name conflict occurs, please declare json.ignore-parse-errors as true in the DDL of the source table to ignore conflicting data. Example: For JSON text:
    {
"nested": {
"inner": {
"col": true
}
}
}
  • Add the computed column rowkey AS JSON_VALUE(properties, $.rowkey) to the CTAS syntax in DDL to specify the column to expand.

Code Example

Example 1: Write to Kafka after reading data from Kafka

Read Kafka data from the Topic named source, and then write it into the Topic named sink. The data is in CSV format.

    CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

Example 2: Synchronize table structure and data

Synchronize the messages in Kafka Topic to Hologres in real time. In this case, you can use the offset and partition id of the Kafka message as the primary key to ensure that there will be no duplicate messages in Hologres during failover.

    CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;

Example 3: Synchronize table structure and key and value data of Kafka messages.

The key part of the Kafka message has stored relevant information, and you can synchronize the key and value in Kafka at the same time.

    CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
note

The key part of the Kafka message does not support table structure changes and type deduction, and you need to declare it manually.

Example 4: Synchronize table structure and data and perform calculations.

When synchronizing Kafka data to Hologres, some lightweight calculations are often required.

    CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');

Common Problem

How to read or write JSON data of nested type using Kafka connector?

The source table DDL uses the ROW format to define the JSON Object, and the result table DDL defines the Key corresponding to the JSON data to be obtained. After setting the Key acquisition method in the DML statement, the value of the corresponding nested Key can be obtained. The code example is as follows:

  • Test Data
    {
"a":"abc",
"b":1,
"c":{
"e":["1","2","3","4"],
"f":{"m" :"567"}
}
}
  • Source table DDL definition
    CREATE TEMPORARY TABLE `kafka_table` (
`a` VARCHAR,
b int,
`c` ROW<e ARRAY<VARCHAR>, f ROW<m VARCHAR>> --c is a JSON Object, corresponding to ROW in Flink; e is json list, corresponding to ARRAY.
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'properties.group.id' = 'xxx',
'format ' = 'json',
'scan.startup.mode' = 'xxx'
);
  • Result table DDL definition
    CREATE TEMPORARY TABLE `sink` (
`a` VARCHAR,
b INT,
e VARCHAR,
`m` varchar
) WITH (
'connector' = 'print',
'logger' = 'true'
);
  • DML statement
    INSERT INTO `sink`
SELECT
`a`,
b,
ce[1], --Flink traverses the array starting from 1, this example is to get element 1 in the array. Get rid of [1] if you get the whole array.
cfm
FROM `kafka_table`;

Troubleshooting

Problem causes

If there is a forwarding mechanism such as proxy or port mapping between Flink and Kafka, the network address of the Kafka server pulled by the Kafka client is the address of the Kafka server itself rather than the address of the proxy. At this time, although the network between Flink and Kafka is connected, Flink cannot consume or write data. There are two steps to establish a connection between Flink and Kafka client (Flink Kafka Connector):

  • The Kafka client pulls the meta information of the Kafka server (Kafka Broker), including the network addresses of all Brokers on the Kafka server.
  • The Kafka client pulls the meta information of the Kafka server (Kafka Broker), including the network addresses of all Brokers on the Kafka server.

Troubleshooting method

Use the following steps to confirm whether there is a forwarding mechanism such as proxy or port mapping between Flink and Kafka:

  • Use the ZooKeeper command line tool (zkCli.sh or zookeeper-shell.sh) to log in to the ZooKeeper cluster used by Kafka.
  • Execute the correct command according to the actual situation of your cluster to obtain your Kafka Broker meta information.
  • Use commands such as ping or telnet to test the connectivity between the address displayed in Endpoint and Flink. If it cannot be connected, it means that there is a forwarding mechanism such as proxy or port mapping between Flink and Kafka.

Solution

  • Without using forwarding mechanisms such as proxy or port mapping, the network between Flink and Kafka is directly connected, so that Flink can directly connect to the Endpoint displayed in the Kafka meta information.
  • Contact the Kafka operation and maintenance personnel, and set the forwarding address as advertised.listeners on the Kafka Broker side, so that the Kafka server meta information pulled by the Kafka client includes the forwarding address.
note

Note that only Kafka 0.10.2.0 and later versions support adding the proxy address to the Kafka Broker Listener. If you want to know more about the principle and explanation of this issue, please refer to KIP-103: Distinguishing Internal and External Network Traffic and Detailed Explanation of Kafka Network Connection Issues.

What is the Role of the Kafka Connector Submitting the Site to the Kafka Server?

Flink will submit the current read Offset to Kafka every time the Checkpoint succeeds. If Checkpoint is not enabled, or the interval set by Checkpoint is too large, the currently read Offset may not be queried on the Kafka side.

Why Kafka Source Table Data cannot Output Data After the Window Based on Event Time?

Question details

As the source table, Kafka cannot output data after the window based on Event Time.

Problem causes

If there is no data in a partition of Kafka, it will affect the generation of Watermark, which will cause the Kafka source table data to be based on the Event Time window and cannot output data.

Solution

  • Make sure data exists on all partitions.
  • Enable the source data idle monitoring function. In the upper right corner of the target job details page, click Edit , add the following code in the more Flink configuration in the advanced configuration panel on the right side of the page, and save it to take effect. table.exec.source.idle-timeout: 5 the table.exec.source.idle-timeout parameter, see Configuration.

Why Kafka Source Table Data cannot Output Data After the Window Based on Event Time?

To enable security configurations related to encryption and authentication, you only need to add the “properties.” prefix to the security configuration on the Kafka table. The following code snippet shows how to configure a Kafka table to use PLAIN as the SASL mechanism and provide a JAAS configuration:

    CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security. protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)

Another more complex example, using SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism:

    CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security. protocol' = 'SASL_SSL',
/* SSL configuration*/
/* Configure the path of the truststore (CA certificate) provided by the server*/
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore .jks',
'properties.ssl.truststore.password' = 'test1234',
/* If client authentication is required, you need to configure the path of the keystore (private key) */
'properties.ssl.keystore.location' = '/ flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/* SASL configuration*/
/* Configure SASL mechanism as SCRAM-SHA-256 */
'properties .sasl.mechanism' = 'SCRAM-SHA-256',
/* Configure JAAS */
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)

For a detailed description of security configuration, see the “Security” section of the Apache Kafka documentation. All files used in the configuration (such as certificates, public keys, and private keys) need to be uploaded to VVP by attaching files, and the files will be stored in the /flink/opt directory after uploading.

note

This page is derived from the official Apache Flink® documentation.

Refer to the Credits page for more information.