Docs Home
Viewing docs for
BYOCSelf-Managed

Confluent Avro

On this page

This article introduces you to usage examples, configuration options, and type mappings of the Avro format.

Background Information

The Avro format allows reading and writing of Avro data based on Avro structures. Currently, the Avro structure is derived based on the table structure.

Example of Use

Example of a table using raw UTF-8 string as Kafka key and Avro records registered in the Schema Registry as Kafka values:

SQL
1    CREATE TABLE user_created (
2        -- one column mapped to the Kafka raw UTF-8 key
3        the_kafka_key STRING,
4        -- a few columns mapped to the Avro fields of the Kafka value
5        id STRING,
6        name STRING,
7        email STRING
8    ) WITH (
9        'connector' = 'kafka',
10        'topic' = 'user_events_example1',
11        'properties.bootstrap.servers' = 'localhost:9092',
12        -- UTF-8 string as Kafka keys, using the 'the_kafka_key' table column
13        'key.format' = 'raw',
14        'key.fields' = 'the_kafka_key',
15        'value.format' = 'avro-confluent',
16        'value.avro-confluent.url' = 'http://localhost:8082',
17        'value.fields-include' = 'EXCEPT_KEY'
18    )

We can write data into the kafka table as follows:

SQL
1    INSERT INTO user_created
2    SELECT
3        -- replicating the user id into a column mapped to the kafka key
4        id as the_kafka_key,
5        -- all values
6        id, name, email
7    FROM some_table

Example of a table with both the Kafka key and value registered as Avro records in the Schema Registry:

SQL
1    CREATE TABLE user_created (
2        -- one column mapped to the 'id' Avro field of the Kafka key
3        kafka_key_id STRING,
4        -- a few columns mapped to the Avro fields of the Kafka value
5        id STRING,
6        name STRING,
7        email STRING
8    ) WITH (
9        'connector' = 'kafka',
10        'topic' = 'user_events_example2',
11        'properties.bootstrap.servers' = 'localhost:9092',
12        -- Watch out: schema evolution in the context of a Kafka key is almost never backward nor
13        -- forward compatible due to hash partitioning.
14        'key.format' = 'avro-confluent',
15        'key.avro-confluent.url' = 'http://localhost:8082',
16        'key.fields' = 'kafka_key_id',
17        -- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
18        -- => adding a prefix to the table column associated to the Kafka key field avoids clashes
19        'key.fields-prefix' = 'kafka_key_',
20        'value.format' = 'avro-confluent',
21        'value.avro-confluent.url' = 'http://localhost:8082',
22        'value.fields-include' = 'EXCEPT_KEY',
23        -- subjects have a default value since Flink 1.13, though can be overridden:
24        'key.avro-confluent.subject' = 'user_events_example2-key2',
25        'value.avro-confluent.subject' = 'user_events_example2-value2'
26    )

Example of a table with both the Kafka key and value registered as Avro records in the Schema Registry:

SQL
1    CREATE TABLE user_created (
2        -- one column mapped to the 'id' Avro field of the Kafka key
3        kafka_key_id STRING,
4        -- a few columns mapped to the Avro fields of the Kafka value
5        id STRING,
6        name STRING,
7        email STRING
8    ) WITH (
9        'connector' = 'kafka',
10        'topic' = 'user_events_example2',
11        'properties.bootstrap.servers' = 'localhost:9092',
12        -- Watch out: schema evolution in the context of a Kafka key is almost never backward nor
13        -- forward compatible due to hash partitioning.
14        'key.format' = 'avro-confluent',
15        'key.avro-confluent.url' = 'http://localhost:8082',
16        'key.fields' = 'kafka_key_id',
17        -- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
18        -- => adding a prefix to the table column associated to the Kafka key field avoids clashes
19        'key.fields-prefix' = 'kafka_key_',
20        'value.format' = 'avro-confluent',
21        'value.avro-confluent.url' = 'http://localhost:8082',
22        'value.fields-include' = 'EXCEPT_KEY',
23        -- subjects have a default value since Flink 1.13, though can be overridden:
24        'key.avro-confluent.subject' = 'user_events_example2-key2',
25        'value.avro-confluent.subject' = 'user_events_example2-value2'
26    )

Example of a table using the upsert-kafka connector with the Kafka value registered as an Avro record in the Schema Registry:

SQL
1    CREATE TABLE user_created (
2        -- one column mapped to the Kafka raw UTF-8 key
3        kafka_key_id STRING,
4        -- a few columns mapped to the Avro fields of the Kafka value
5        id STRING,
6        name STRING,
7        email STRING,
8        -- upsert-kafka connector requires a primary key to define the upsert behavior
9        PRIMARY KEY (kafka_key_id) NOT ENFORCED
10    ) WITH (
11        'connector' = 'upsert-kafka',
12        'topic' = 'user_events_example3',
13        'properties.bootstrap.servers' = 'localhost:9092',
14        -- UTF-8 string as Kafka keys
15        -- We don't specify 'key.fields' in this case since it's dictated by the primary key of the table
16        'key.format' = 'raw',
17        -- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
18        -- => adding a prefix to the table column associated to the kafka key field avoids clashes
19        'key.fields-prefix' = 'kafka_key_',
20        'value.format' = 'avro-confluent',
21        'value.avro-confluent.url' = 'http://localhost:8082',
22        'value.fields-include' = 'EXCEPT_KEY'
23    )

Configuration Options

ParameterDescriptionRequiredForwardedDefaultType
formatSpecify what format to use, here should be ‘avro-confluent’.yesnononeString
avro-confluent.basic-auth.credentials-sourceBasic auth credentials source for Schema RegistrynoyesnoneString
avro-confluent.basic-auth.user-infoBasic auth user info for schema registrynoyesnoneString
avro-confluent.bearer-auth.credentials-sourceBearer auth credentials source for Schema RegistrynoyesnoneString
avro-confluent.bearer-auth.tokenSpecify what format to use, here should be ‘avro-confluent’.nonononeString
avro-confluent.propertiesProperties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence.nonononeMap
formatSpecify what format to use, here should be ‘avro-confluent’.yesnononeString
avro-confluent.ssl.keystore.locationLocation / File of SSL keystorenoyesnoneString
avro-confluent.ssl.keystore.passwordPassword for SSL keystorenoyesnoneString
avro-confluent.ssl.truststore.passwordPassword for SSL truststorenoyesnoneString
avro-confluent.schemaThe schema registered or to be registered in the Confluent Schema Registry. If no schema is provided Flink converts the table schema to avro schema. The schema provided must match the table schema.yesnononeString
avro-confluent.subjectThe Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, kafka and upsert-kafka connectors use <topic_name>-value or <topic_name>-key as the default subject name if this format is used as the value or key format. But for other connectors (e.g. filesystem), the subject option is required when used as sink.noyesnoneString

Configuration Options

Currently, Apache Flink always uses the table schema to derive the Avro reader schema during deserialization and Avro writer schema during serialization. Explicitly defining an Avro schema is not supported yet. See Apache Avro Format for the mapping between Avro and Flink DataTypes.

In addition to the types listed there, Flink supports reading/writing nullable types. Flink maps nullable types to Avro union(something, null), where something is the Avro type converted from Flink type.

You can refer to Avro Specification for more information about Avro types.

Was this helpful?