Apache Kafka
Background Information
Apache Kafka is an open source distributed message queuing system, which is widely used in big data fields such as high-performance data processing, stream analysis, and data integration. Based on the open source Apache Kafka client, the Kafka connector provides high-performance data throughput, read and write of multiple data formats.
Category | Description |
---|---|
Supported mode | Streaming mode |
Supported formats | CSV, JSON, Apache Avro, Confluent Avro, Debezium JSON, Canal JSON, Maxwell JSON, Raw |
Supported indicators | Source: numRecords , numRecordsInPerSecond , numBytes PerScond , currentEmitEventTimeLag , currentFetchEventTimeLag , sourceIdleTime , pendingRecords . Sink: numRecords , numRecordsOutPerSecond , numBytesOutPerSecond , currentSendTime . |
Connect to Self-built Apache Kafka Cluster
- Self-built Apache Kafka cluster version 0.11 and above
- The network between Flink and the self-built Apache Kafka cluster has been opened.
Network Connection Troubleshooting
If your Flink job has a “Timed out waiting for a node assignment” error when starting, it is usually caused by a network connectivity issue between Flink and Kafka.
The process of establishing a connection between the Kafka client and the server is as follows:
- The client uses the
bootstrap.servers
address specified by the user to connect to the Kafka server, and the Kafka server returns the meta information of each broker in the cluster to the client according to the configuration, including the connection address of each broker. - The client uses the connection address returned by the broker in the first step to connect to each broker for reading/writing.
If the Kafka server is not configured correctly, the connection address received by the client in the first step is wrong, even if the address configured in bootstrap.servers
can be connected, it cannot read/write data normally. This problem often occurs when there are network forwarding mechanisms such as proxy, port forwarding, and dedicated lines between Flink and Kafka.
Usage Restrictions
Version and configuration
- Only the Flink computing engine VERA 1.0.3 and above supports the message queue Kafka Connector.
- Only supports reading and writing data of Apache Kafka 0.11 and above.
- Only Apache Kafka version 2.8 client configuration items are supported. For details, see the Apache Kafka consumer and producer configuration item documentation.
- When the Kafka result table (sink) uses exactly-once semantics, the Kafka cluster to be written must enable the transaction function, and only Apache Kafka 0.11 and above clusters are supported.
Use of CREATE TABLE AS (CTAS)
Only the Flink computing engine vera-1.0.3-flink-1.13 and above supports Kafka as the synchronization data source of CREATE TABLE AS (CTAS). Only supports type deduction and schema changes in JSON format, other data formats are not supported for now. When using Kafka as the data source of the CREATE TABLE AS (CTAS) statement, it only supports synchronization schema changes to Hudi and Hologres result tables. Only the type deduction and table structure change of the value part in Kafka are supported. If you need to synchronize the columns of the Kafka key part, you need to manually specify it in the DDL. For details, see Example 3.
Grammatical Structures
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA VIRTUAL FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
Meta Info Column
You can define meta information columns in source and target tables to get or write meta information of Kafka messages. For example, when multiple topics are defined in the WITH parameter, if the meta information column is defined in the Kafka source table, the data read by Flink will be identified from which topic the data is read from. Examples of usage of meta information columns are as follows:
CREATE TABLE kafka_source (
-- read the topic to which the message belongs as `record_topic` field
`record_topic` STRING NOT NULL METADATA VIRTUAL FROM 'topic',
-- read the timestamp in ConsumerRecord as `ts` field
`ts` TIMESTAMP(3 ) METADATA VIRTUAL FROM 'timestamp',
-- Read message offset as `record_offset` field
`record_offset` BIGINT NOT NULL METADATA VIRTUAL FROM 'offset',
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
-- Write the timestamp in the `ts` field as the ProducerRecord timestamp to Kafka
`ts` TIMESTAMP(3) METADATA VIRTUAL FROM 'timestamp',
...
) WITH (
'connector' = 'kafka' ,
...
);
The following table lists the metadata columns supported by Kafka source and target tables:
Category | Type of data | Description | Source table/Target table |
---|---|---|---|
topic | STRING NOT NULL METADATA VIRTUAL | The name of the topic where the Kafka message resides. | source table |
partition | INT NOT NULL METADATA VIRTUAL | Partition ID where the Kafka message resides. | source table |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | The header of the Kafka message. | source table and target table |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Leader epoch of Kafka messages. | source table |
offset | BIGINT NOT NULL METADATA VIRTUAL | The offset (offset) of the Kafka message. | source table |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | The timestamp of the Kafka message. | source table and target table |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Timestamp type of: 1. Kafka message: NoTimestampType: No timestamp is defined in the message. 2. CreateTime: The time when the message was generated. 3. LogAppendTime: The time when the message was added to Kafka Broker. | source table |
WITH Parameter
Universal type
Parameter | Description | Type of data | Required | Defaults | Additional info |
---|---|---|---|---|---|
connector | connector type | String | yes | none | The fixed value is kafka. |
properties.bootstrap.servers | Kafka broker address | String | yes | none | The format is host:port,host:port,host:port, separated by commas (,). |
properties.* | Direct configuration of Kafka clients | String | no | none | the producer and consumer configuration defined in the official Kafka documentation. Flink will remove the properties. prefix and pass the remaining configuration to the Kafka client. For example, you can disable automatic topic creation by passing ‘properties.allow.auto.create.topics’ = ‘false’. The following configurations cannot be modified this way as they will be overwritten by the Kafka connector: key. deserializer value. deserializer |
format | The format to use when reading or writing the value part of a Kafka message. | String | no | none | Supported formats: csv, json, avro, debezium-json, canal-json, maxwell-json, avro-confluent, raw |
key.format | The format to use when reading or writing the key part of a Kafka message. | String | no | none | Supported formats: csv, json, avro, debezium-json, canal-json, maxwell-json, avro-confluent, raw Note: When using this configuration, the “key.options” configuration is required. |
key.fields | The source table/result table field corresponding to the key part of the Kafka message. | String | no | none | Multiple field names are separated by a semicolon (;). For example, field1;field2 |
key.fields-prefix | Specify a custom prefix for all Kafka message key parts to avoid duplication with message value part format fields. | String | no | none | This configuration item is only used to distinguish the column names of the source table and the result table. When parsing and generating the key part of the Kafka message, the prefix will be removed. Note: When using this configuration, “value.fields-include” must be configured as EXCEPT_KEY. |
value. format | The format to use when reading or writing the value part of a Kafka message. | String | no | none | Note: When using this configuration, the “key.options” configuration is required. This configuration is equivalent to “format”, so format and value.format can only configure one of them. If you configure both at the same time, there will be conflicts. |
value.fields-include | When parsing or generating the value part of a Kafka message, whether to include the field corresponding to the key part of the message. | String | no | ALL | The values are as follows: ALL (default value): all columns will be processed as Kafka message value part EXCEPT_KEY: Remove the fields defined by key.fields, and the remaining fields are processed as the value part of the Kafka message |
Source type
Parameter | Description | Type of data | Required | Defaults | Additional info |
---|---|---|---|---|---|
topic | The topic name to read | String | no | none | Separate multiple topic names with a semicolon (;), such as topic-1;topic-2 Note: Only one of the two options topic and topic-pattern can be specified. |
topic-pattern | A regular expression that matches the read topic name. All topics matching this regular expression will be read when the job runs. | String | no | none | Note: Only VERA 1.0.3 and later versions support this parameter. Only one of the two options topic and topic-pattern can be specified. |
properties.group.id | consumer group ID | String | no | KafkaSource-{source table name} | If the specified group id is first used, properties.auto.offset.reset must be set to earliest or latest to specify the first start position. |
scan.startup.mode | Start point for Kafka to read data | String | no | group-offsets | The values are as follows: earliest-offset: read from the earliest Kafka partition. latest-offset: Start reading from the latest position in Kafka. group-offsets (default): Start reading from the committed position of the specified properties.group.id. timestamp: Start reading from the timestamp specified by scan.startup.timestamp-millis. specific-offsets: start reading from the offset specified by scan.startup.specific-offsets. |
scan.startup.specific-offsets | In the specific-offsets boot mode, specify the boot offsets for each partition. | String | no | none | For example: partition:0,offset:42;partition:1,offset:300 |
scan.startup.timestamp-millis | In timestamp startup mode, specify the startup site timestamp. | Long | no | none | in milliseconds |
scan.topic-partition-discovery.interval | Dynamically detect the time interval of Kafka topic and partition. | Duration | no | none | Topic / partition dynamic detection is not enabled by default. |
Parameter | Description | Type of data | Required | Defaults | Additional info |
Parameter | Description | Type of data | Required | Defaults | Additional info |
Parameter | Description | Type of data | Required | Defaults | Additional info |
Sink Type
Parameter | Description | Type of data | Required | Defaults | Additional info |
---|---|---|---|---|---|
topic | The topic name to write | String | no | default | |
sink.partitioner | Mapping mode from Flink concurrently to Kafka partition | String | no | default | The values are as follows: default (default value): use Kafka’s default partition mode fixed: Each Flink concurrently corresponds to a fixed Kafka partition. round-robin: The data in Flink concurrency will be allocated to each partition of Kafka in turn. Custom partition mapping mode: If fixed and round-robin do not meet your needs, you can create a subclass of FlinkKafkaPartitioner to customize the partition mapping mode. For example org.mycompany.MyPartitioner |
sink.delivery-guarantee | Semantic schema for Kafka sink | String | no | at-least-once | The values are as follows: none: no semantics are guaranteed, data may be lost or duplicated; at-least-once (default): data is guaranteed not to be lost, but may be duplicated; exactly-once: use Kafka transactions to ensure that data will not be lost and duplicated Note: “sink.transactional-id-prefix” is required when using exactly-once semantics |
sink.transactional-id-prefix | Kafka transaction ID prefix used under exactly-once semantics. | String | no | none | Note This configuration will only take effect if “sink.delivery-guarantee” is configured as “exactly-once” |
sink.parallelism | Concurrency of Kafka sink operator | Integer | no | The concurrency of upstream operators is determined by the framework. |
CTAS Sync Data Source Type
Parameter | Description | Type of data | Required | Defaults | Additional info |
---|---|---|---|---|---|
json.infer-schema.flatten-nested-columns.enable | Whether to recursively expand nested columns in JSON | Boolean | no | FALSE | The parameter values are as follows: true: Recursive expansion. For columns that are expanded, Flink uses the path that indexes the value as the name. For example, for the column col in JSON {“nested”: {“col”: true}} , its expanded name is nested.col. false (default): Treat nested types as Strings. |
json.infer-schema.primitive-as-string | Whether to deduce all basic types as String type. | Boolean | no | FALSE | The parameter values are as follows: true: deduces all primitive types as String. false (default): Follow the ground rules for derivation. |
Parameter | Description | Type of data | Required | Defaults | Additional info |
Parameter | Description | Type of data | Required | Defaults | Additional info |
All configuration items supported by Kafka consumer and producer can be used in the WITH parameter after adding the “properties.” prefix before the configuration. For example, if you need to configure the timeout “request.timeout.ms” of Kafka consumer or producer to 60000 milliseconds, you can configure ‘properties.request.timeout.ms’ = ‘60000’ in the WITH parameter. The configuration items of Kafka consumer and Kafka producer can be obtained in the official Apache Kafka documentation.
Message Format
Kafka source and result tables support the following message formats:
Each format has its corresponding configuration item, which can be directly used in the WITH parameter. Format configuration items can refer to the Flink community documentation corresponding to each format.
Security and Authentication
If your Kafka cluster requires secure connection or authentication, please add the relevant security and authentication configuration with the prefix of properties. and set it in the WITH parameter. Here is an example of how to configure a Kafka table to use PLAIN as the SASL mechanism and provide a JAAS configuration:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security. protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\ "password=\"password\";'
)
Another more complex example uses SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security. protocol' = 'SASL_SSL',
/* SSL configuration*/
/* Configure the path of the truststore (CA certificate) provided by the server*/
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore .jks',
'properties.ssl.truststore.password' = 'test1234',
/* If client authentication is required, you need to configure the path of the keystore (private key) */
'properties.ssl.keystore.location' = '/ flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/* SASL configuration*/
/* Configure SASL mechanism as SCRAM-SHA-256 */
'properties .sasl.mechanism' = 'SCRAM-SHA-256',
/* Configure JAAS */
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username \"password=\"password\";'
)
The CA certificate and private key mentioned in the example can be uploaded to the platform using the resource upload function of VVP, and the uploaded files are stored in the ’/flink/usrlib’ directory. Assuming that the name of the CA certificate file to be referenced is my-truststore.jks, you can specify ‘properties.ssl.truststore.location’ = ‘/flink/usr lib/my-truststore.jks’ in the WITH parameter after uploading to use the Certificate.
Source Table Start Site
Boot mode The Kafka source table can specify the initial read position by configuring “scan.startup.mode”:
- Earliest position (earliest-offset), read from the earliest position of the current partition
- The last position (latest-offset), read from the last position of the current partition
- Submitted site (group-offsets), read from the submitted site of the specified group id, the group id is specified by “properties.group.id”
- Specify a timestamp (timestamp), read from the first message whose timestamp is greater than or equal to the specified time, and the timestamp is specified by “scan.startup.timestamp-millis”
- Specific location (specific-offsets), start consumption from the partition location specified by the user, and the location is specified by “scan.startup.specific-offsets”. If the user does not specify the start point, consumption will be started from the committed point (group-offsets) by default. Please note that “scan.startup.mode” is only valid for stateless started jobs. When a stateful job starts, it will start consuming from the location stored in the state.
The code example is as follows:
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
-- start consumption from the earliest position
'scan.startup.mode' = 'earliest-offset',
-- from the last position Point start consuming
'scan.startup.mode' = 'latest-offset',
-- Start consuming from the committed site of consumer group "my-group"
'properties.group.id' = 'my-group',
' scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- if "my-group" is used for the first time, start consumption from the earliest position
'properties.auto .offset.reset' = 'latest', -- If "my-group" is used for the first time, start consumption from the last position
-- start consumption from the specified millisecond timestamp 1655395200000
'scan.startup.mode' = ' timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
-- start consumption from the specified location
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = ' partition:0,offset:42;partition:1,offset:300'
);
Start Site Priority
The priority of the source table start site is:
- Points stored in Checkpoint / Savepoint
- User-specified launch time on the VVP platform
- The startup point specified by the user through scan.startup.offset in the WITH parameter
- Use group-offsets when scan.startup.offset is not specified
In any of the above steps, if the site is invalid due to the expiration of the site or a problem with the Kafka cluster, the site will be reset using the strategy specified by properties.auto.offset.reset. If the user does not set this configuration item, An exception will be generated requiring user intervention.
A common case is to start consumption with a brand new group id. First, the source table will query the Kafka cluster for the submitted position of the group. Since the group id is used for the first time, no valid position will be queried, so it will be reset through the strategy configured by the properties.auto.offset.reset parameter. place. Therefore, when using a new group id for consumption, you must configure properties.auto.offset.reset to specify the location reset strategy.
Source Site Submission
The Kafka source table only submits the current consumption site to the Kafka cluster after the checkpoint succeeds. If your checkpoint interval is set longer, the consumption point you observe on the Kafka cluster side will be delayed. When performing a checkpoint, the Kafka source table will store the current read progress in the state, and does not rely on the site submitted to the cluster for fault recovery. The submitted site is only for monitoring the read progress on the Kafka side. Failure to submit a site will not have any impact on data accuracy.
Result Table Custom Partitioner
If the built-in Kafka Producer partition mode cannot meet your needs, you can implement a custom partition mode to write data to the corresponding partition. Custom partitioners need to inherit FlinkKafkaPartitioner. After the development is completed, compile the JAR package and upload it to the VVP platform using the resource upload function. After uploading and referencing, please set the sink.partitioner parameter in the WITH parameter, and the value is the complete class path of the partitioner, such as “org.mycompany.MyPartitioner”
Kafka / Upsert Kafka / Kafka JSON Catalog Selection
Kafka is a message queue system that can only add data, but cannot update and delete data. Therefore, it cannot handle upstream CDC change data and rollback logic of operators such as aggregation and union in streaming SQL computing. If you need to write data containing changes or rollbacks to Kafka, use the Upsert Kafka result table that handles changed data specially.
To facilitate users to batch-synchronize the changed data in one or more data tables in the upstream database to Kafka, you can use the Kafka JSON catalog. If the data format stored in your Kafka is JSON, using the Kafka JSON catalog can save the steps of defining schema and WITH parameters. See the Kafka JSON catalog documentation for details.
As a CTAS Data Source
The CTAS statement supports the message queue Kafka and the table format is JSON as the data source. During data synchronization, if some fields do not appear in the predefined table structure, Flink will try to automatically deduce the type of the column. If the automatically deduced type cannot meet your usage requirements, you can also declare the analytical type of some columns through assisted derivation. For a detailed description of JSON Format, see JSON Format documentation for details.
Type Inference
During the type deduction process, Flink will only expand the first layer of data in the JSON text by default, and perform type deduction according to the basic rules according to its type and value. Of course, you can also declare the analytical type of a specific column in DDL to meet your special needs, that is, auxiliary derivation.
Basic Rules
The basic rules of type mapping are shown in the table below.
JSON type | Flink SQL type |
---|---|
BOOLEAN | BOOLEAN |
STRING | DATE, TIMESTAMP, TIMESTAMP_LTZ, TIME, or STRING |
INT or LONG | BIGINT |
BIGINT | DECIMAL or STRING. Explanation: Since the type of DECIMAL in Flink has precision limitations. Therefore, if the actual value of the integer exceeds the maximum precision of the DECIMAL type, Flink will automatically deduce its type as STRING to avoid loss of precision. |
FLOAT, DOUBLE, or BIGDECIMAL | DOUBLE |
ARRAY | STRING |
OBJECT | STRING |
Example for the following JSON text:
{
"id": 101,
"name": "VVP",
"properties": {
"owner": "Ververica",
"engine": "Flink"
}
"type": ["Big Data"]
}
The table information written by Flink to the downstream storage:
id | name | properties | type |
---|---|---|---|
101 | VVP | [“Big Data”] |
Auxiliary Derivation
If you feel that the above basic rules do not meet your actual needs, you can declare the resolution type of a specific column in the DDL of the source table. In this way, Flink will preferentially use the column type you declare to parse the target field. For the following example, Flink will use the DECIMAL method to parse the price field instead of using the default basic rules to convert it to the DOUBLE type.
CREATE TABLE evolvingKafkaSource (
price DECIMAL(18, 2)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'localhost:9092',
'topic' = 'evolving_kafka_demo',
'scan.startup.mode ' = 'earliest-offset',
'format' = 'json'
);
However, if the type you specify in the DDL is inconsistent with the type in the actual data, you can handle it as follows:
- When the declared type is wider than the actual type, the declared type is automatically resolved. For example, if the declaration is DOUBLE and the encountered data type is BIGINT, it will be parsed as DOUBLE.
- When the actual type is wider than the declared type or the two types are incompatible, because current CTAS does not support type changes, an error will be reported to prompt you for relevant information. You need to restart the job and declare the exact type to parse the data.
Auxiliary Derivation of Complex Types
-
Auxiliary derivation of complex types is not supported, including ROW, ARRAY, MAP, and MULTISET. For complex types, Flink handles them as STRING by default. Typically, JSON text in a Kafka topic has a nested structure. If you need to extract nested columns in JSON text, there are two ways:
-
Declare ‘json.infer-schema.flatten-nested-columns.enable’=‘true’ in the source table DDL to flatten all elements in nested columns to the top level. In this way, all nested columns will be expanded sequentially. In order to avoid column name conflicts, Flink uses the path from the index to the column as the expanded column name.
Column Name Conflicts
- Resolving column name conflicts is not currently supported. If a column name conflict occurs, please declare json.ignore-parse-errors as true in the DDL of the source table to ignore conflicting data. Example: For JSON text:
{
"nested": {
"inner": {
"col": true
}
}
}
- Add the computed column
rowkey
AS JSON_VALUE(properties
,$.rowkey
) to the CTAS syntax in DDL to specify the column to expand.
Code Example
Example 1: Write to Kafka after reading data from Kafka
Read Kafka data from the Topic named source, and then write it into the Topic named sink. The data is in CSV format.
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;
Example 2: Synchronize table structure and data
Synchronize the messages in Kafka Topic to Hologres in real time. In this case, you can use the offset and partition id of the Kafka message as the primary key to ensure that there will be no duplicate messages in Hologres during failover.
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
Example 3: Synchronize table structure and key and value data of Kafka messages.
The key part of the Kafka message has stored relevant information, and you can synchronize the key and value in Kafka at the same time.
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
The key part of the Kafka message does not support table structure changes and type deduction, and you need to declare it manually.
Example 4: Synchronize table structure and data and perform calculations.
When synchronizing Kafka data to Hologres, some lightweight calculations are often required.
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
Common Problem
How to read or write JSON data of nested type using Kafka connector?
The source table DDL uses the ROW format to define the JSON Object, and the result table DDL defines the Key corresponding to the JSON data to be obtained. After setting the Key acquisition method in the DML statement, the value of the corresponding nested Key can be obtained. The code example is as follows:
- Test Data
{
"a":"abc",
"b":1,
"c":{
"e":["1","2","3","4"],
"f":{"m" :"567"}
}
}
- Source table DDL definition
CREATE TEMPORARY TABLE `kafka_table` (
`a` VARCHAR,
b int,
`c` ROW<e ARRAY<VARCHAR>, f ROW<m VARCHAR>> --c is a JSON Object, corresponding to ROW in Flink; e is json list, corresponding to ARRAY.
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'properties.group.id' = 'xxx',
'format ' = 'json',
'scan.startup.mode' = 'xxx'
);
- Result table DDL definition
CREATE TEMPORARY TABLE `sink` (
`a` VARCHAR,
b INT,
e VARCHAR,
`m` varchar
) WITH (
'connector' = 'print',
'logger' = 'true'
);
- DML statement
INSERT INTO `sink`
SELECT
`a`,
b,
ce[1], --Flink traverses the array starting from 1, this example is to get element 1 in the array. Get rid of [1] if you get the whole array.
cfm
FROM `kafka_table`;
Troubleshooting
Flink and Kafka are Connected to the Network, but Flink cannot Consume or Write Data?
Problem causes
If there is a forwarding mechanism such as proxy or port mapping between Flink and Kafka, the network address of the Kafka server pulled by the Kafka client is the address of the Kafka server itself rather than the address of the proxy. At this time, although the network between Flink and Kafka is connected, Flink cannot consume or write data. There are two steps to establish a connection between Flink and Kafka client (Flink Kafka Connector):
- The Kafka client pulls the meta information of the Kafka server (Kafka Broker), including the network addresses of all Brokers on the Kafka server.
- The Kafka client pulls the meta information of the Kafka server (Kafka Broker), including the network addresses of all Brokers on the Kafka server.
Troubleshooting method
Use the following steps to confirm whether there is a forwarding mechanism such as proxy or port mapping between Flink and Kafka:
- Use the ZooKeeper command line tool (zkCli.sh or zookeeper-shell.sh) to log in to the ZooKeeper cluster used by Kafka.
- Execute the correct command according to the actual situation of your cluster to obtain your Kafka Broker meta information.
- Use commands such as ping or telnet to test the connectivity between the address displayed in Endpoint and Flink. If it cannot be connected, it means that there is a forwarding mechanism such as proxy or port mapping between Flink and Kafka.
Solution
- Without using forwarding mechanisms such as proxy or port mapping, the network between Flink and Kafka is directly connected, so that Flink can directly connect to the Endpoint displayed in the Kafka meta information.
- Contact the Kafka operation and maintenance personnel, and set the forwarding address as advertised.listeners on the Kafka Broker side, so that the Kafka server meta information pulled by the Kafka client includes the forwarding address.
Note that only Kafka 0.10.2.0 and later versions support adding the proxy address to the Kafka Broker Listener. If you want to know more about the principle and explanation of this issue, please refer to KIP-103: Distinguishing Internal and External Network Traffic and Detailed Explanation of Kafka Network Connection Issues.
What is the Role of the Kafka Connector Submitting the Site to the Kafka Server?
Flink will submit the current read Offset to Kafka every time the Checkpoint succeeds. If Checkpoint is not enabled, or the interval set by Checkpoint is too large, the currently read Offset may not be queried on the Kafka side.
Why Kafka Source Table Data cannot Output Data After the Window Based on Event Time?
Question details
As the source table, Kafka cannot output data after the window based on Event Time.
Problem causes
If there is no data in a partition of Kafka, it will affect the generation of Watermark, which will cause the Kafka source table data to be based on the Event Time window and cannot output data.
Solution
- Make sure data exists on all partitions.
- Enable the source data idle monitoring function. In the upper right corner of the target job details page, click Edit , add the following code in the more Flink configuration in the advanced configuration panel on the right side of the page, and save it to take effect. table.exec.source.idle-timeout: 5 the table.exec.source.idle-timeout parameter, see Configuration.
Why Kafka Source Table Data cannot Output Data After the Window Based on Event Time?
To enable security configurations related to encryption and authentication, you only need to add the “properties.” prefix to the security configuration on the Kafka table. The following code snippet shows how to configure a Kafka table to use PLAIN as the SASL mechanism and provide a JAAS configuration:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security. protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)
Another more complex example, using SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security. protocol' = 'SASL_SSL',
/* SSL configuration*/
/* Configure the path of the truststore (CA certificate) provided by the server*/
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore .jks',
'properties.ssl.truststore.password' = 'test1234',
/* If client authentication is required, you need to configure the path of the keystore (private key) */
'properties.ssl.keystore.location' = '/ flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/* SASL configuration*/
/* Configure SASL mechanism as SCRAM-SHA-256 */
'properties .sasl.mechanism' = 'SCRAM-SHA-256',
/* Configure JAAS */
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)
For a detailed description of security configuration, see the “Security” section of the Apache Kafka documentation. All files used in the configuration (such as certificates, public keys, and private keys) need to be uploaded to VVP by attaching files, and the files will be stored in the /flink/opt directory after uploading.
This page is derived from the official Apache Flink® documentation.
Refer to the Credits page for more information.