Skip to main content

Manage Kafka Schema Registry catalog

Use the Kafka Schema Registry Catalog to integrate Flink SQL with Confluent Schema Registry or AWS Glue Schema Registry. This allows you to manage schemas and use Kafka topics with associated schemas directly as Flink tables.

Background

A schema registry is a centralized repository for managing and validating data schemas used in streaming platforms like Apache Kafka. It ensures data consistency between producers and consumers and enables safe schema evolution. Flink supports integration with both Confluent Schema Registry and AWS Glue Schema Registry through this catalog, primarily for the Avro format.

Prerequisites

  • Access to a running Kafka cluster.
  • A deployed and accessible schema registry (Confluent or AWS Glue).
  • Network connectivity between your Flink environment and the schema registry.
  • Appropriate credentials and permissions for the schema registry.

Supported Schema Registries

RegistryFormats SupportedDeploymentIntegration Method
Confluent Schema RegistryAvroSelf-managed or Confluent CloudREST API, Flink Catalog
AWS Glue Schema RegistryAvroAWS-managedAWS SDK, Flink Catalog

Catalog Features

The Kafka Schema Registry Catalog enables you to discover, manage, and use schemas registered in Confluent or AWS Glue as Flink tables. This simplifies schema management and enforces compatibility across streaming applications. The underlying logic for both registry types is shared within the catalog, differing mainly in connection details and specific WITH options.

The catalog maps Flink concepts to Schema Registry and Kafka concepts as follows:

Flink ConceptConfluent SR EquivalentAWS Glue SR EquivalentKafka Equivalent
Catalog Name(User-defined in Flink)(User-defined in Flink)N/A
Database Namekafka (Fixed)kafka (Fixed)N/A
Registry NameN/A(User-defined in AWS)N/A
Table NameTwo Subjects: <table_name>-key & <table_name>-valueTwo Schema IDs: <table_name>-key & <table_name>-valueOne Topic: <table_name>
note
  • In Confluent Schema Registry, schemas are grouped by subject. Flink table names map directly to these subject names (excluding the -key/-value suffix).
  • In AWS Glue Schema Registry, schemas are identified by IDs within a named registry. You must create the registry in AWS Glue before creating the Flink catalog.

Creating a Catalog

You must first create a catalog linked to your specific schema registry.

Confluent Schema Registry

This statement creates a catalog named confluent_sr_catalog connected to a Confluent Schema Registry.

  • type = 'kafka': Specifies the catalog type.
  • properties.bootstrap.servers: Your Kafka broker address(es).
  • url: The endpoint of your Confluent Schema Registry service.
  • format = 'avro-confluent': Indicates the use of Confluent's Avro format.
  • key.fields-prefix / value.fields-prefix: Prefixes applied to key/value fields in the Flink table schema to avoid name collisions.
CREATE CATALOG confluent_sr_catalog WITH (
'type' = 'kafka',
'properties.bootstrap.servers' = 'kafka.example.com:9092',
'url' = '[http://confluent-sr.example.com:8081](http://confluent-sr.example.com:8081)',
'format' = 'avro-confluent',
'key.fields-prefix' = 'k_',
'value.fields-prefix' = 'v_'
);

AWS Glue Schema Registry

This statement creates a catalog named glue_sr_catalog connected to an AWS Glue Schema Registry.

  • type = 'kafka': Specifies the catalog type.
  • properties.bootstrap.servers: Your Kafka broker address(es).
  • format = 'avro-glue': Indicates the use of AWS Glue's Avro format.
  • aws.glue.region: The AWS region where your registry resides.
  • aws.glue.registry.name: The name you gave your schema registry in AWS Glue.
  • aws.glue.aws.credentials.provider.basic.accesskeyid / secretkey: Explicit AWS credentials (alternatively, use IAM roles or environment variables).
  • key.fields-prefix / value.fields-prefix: Prefixes applied to key/value fields.
CREATE CATALOG glue_sr_catalog WITH (
'type' = 'kafka',
'properties.bootstrap.servers' = 'kafka.example.com:9092',
'format' = 'avro-glue',
'aws.glue.region' = 'us-east-1',
'aws.glue.registry.name' = 'my-registry',
'aws.glue.aws.credentials.provider.basic.accesskeyid' = '<your-access-key>',
'aws.glue.aws.credentials.provider.basic.secretkey' = '<your-secret-key>',
'key.fields-prefix' = 'k_',
'value.fields-prefix' = 'v_'
);

After creating the catalog, activate it for your current session:

USE CATALOG confluent_sr_catalog; -- or glue_sr_catalog

You can now manage tables within this catalog.

Catalog Options

The following table summarizes the primary options used when creating a Kafka Schema Registry Catalog.

OptionRequiredDescription
typeYesMust be 'kafka'.
properties.bootstrap.serversYesThe address of the Kafka Broker. Essential for establishing a connection to the Kafka cluster.
formatYesSpecifies the serialization format and target Schema Registry. Use 'avro-confluent' for Confluent SR or 'avro-glue' for AWS Glue SR.
urlRequired for 'avro-confluent'The endpoint URL of the Confluent Schema Registry (e.g., http://localhost:8081). It is essential to provide this URL when using the avro-confluent format to ensure proper schema management.
key.fields-prefixRequired for 'avro-confluent' (cannot be empty if used)Prefix applied to key fields in the Flink table schema.
value.fields-prefixRequired for 'avro-confluent' (cannot be empty if used)Prefix applied to value fields in the Flink table schema.
aws.glue.regionRequired for 'avro-glue'AWS region of the AWS Glue Schema Registry.
aws.glue.registry.nameRequired for 'avro-glue'Name of the registry in AWS Glue Schema Registry.
aws.glue.aws.credentials.provider.basic.accesskeyidRequired for 'avro-glue'AWS access key ID. Overrides environment variables or instance profiles.
aws.glue.aws.credentials.provider.basic.secretkeyRequired for 'avro-glue'AWS secret key. Overrides environment variables or instance profiles.

Creating a Table

When you execute a CREATE TABLE statement within the catalog, Flink automatically registers two schemas in the Schema Registry under corresponding subjects/IDs: one for the key (<table_name>-key) and one for the value (<table_name>-value). The Kafka topic name will match the Flink table name.

Fields designated as PRIMARY KEY in Flink SQL are included in both the key and value schemas. In the resulting Flink table schema, these key fields are duplicated, once with the key.fields-prefix and once with the value.fields-prefix.

CREATE TABLE TableName (
att1 STRING,
att2 INT,
att3 DOUBLE,
PRIMARY KEY (att1, att2) NOT ENFORCED
);

After creation, the Flink table TableName will have columns like v_att1, v_att2, v_att3, k_att1, k_att2.

diagram for create table

Querying and Writing Data

All subsequent SELECT, INSERT, and ALTER statements must operate on the table schema that includes the key/value prefixes.

important

When writing data using INSERT INTO ... SELECT ... FROM, you must specify the explicit column names in your SELECT projection. Using SELECT * is not supported because the source table schema includes Kafka metadata fields (partition, offset, timestamp) that cannot be written to the sink. The typical column order in the Flink table schema places value-prefixed fields before key-prefixed fields. Ensure your SELECT projection matches the target table's column order if you don't explicitly list columns in the INSERT INTO clause.

USE CATALOG confluent_sr_catalog; -- or glue_sr_catalog

-- Create a second table with the same schema
CREATE TABLE TableName2 (
att1 STRING,
att2 INT,
att3 DOUBLE,
PRIMARY KEY (att1, att2) NOT ENFORCED
);

-- Insert sample data into the first table (using explicit columns)
INSERT INTO TableName (k_att1, k_att2, v_att1, v_att2, v_att3) VALUES ('key1', 1, 'key1', 1, 10.5);

-- Copy data from TableName to TableName2
INSERT INTO TableName2
SELECT v_att1, v_att2, v_att3, k_att1, k_att2 -- Explicit projection required
FROM TableName
/*+ OPTIONS (
'scan.startup.mode' = 'earliest-offset'
) */;

querying a table diagram

Using Hints for Connector Options

Specific Kafka connector options (like scan.startup.mode, scan.bounded.mode) cannot be set at the catalog level, as they would apply too broadly. Instead, use SQL hints /*+ OPTIONS (...) */ within your query to override these options for that specific query only.

-- Run a bounded query that stops at the latest offset
INSERT INTO TableName2
SELECT v_att1, v_att2, v_att3, k_att1, k_att2
FROM TableName
/*+ OPTIONS (
'scan.startup.mode' = 'earliest-offset',
'scan.bounded.mode' = 'latest-offset'
) */;
note

An INSERT INTO ... SELECT ... FROM query creates an unbounded streaming job by default. Use the 'scan.bounded.mode' = 'latest-offset' hint to run it as a finite batch job for testing.

Modifying a Table Schema

The catalog supports limited ALTER TABLE operations:

  • You can ADD or DROP columns using their prefixed names (e.g.,v_new_column).
  • You cannot modify or drop columns that are part of the PRIMARY KEY (neither the k_ nor the v_ prefixed versions).
-- Add a new value column
ALTER TABLE TableName ADD (v_new_col STRING);

-- Drop an existing value column (must not be part of the PK)
ALTER TABLE TableName DROP (v_att3);

Limitations

LimitationNotes
Schema EvolutionCurrently, only the BACKWARD compatibility policy is fully supported for schema changes made via ALTER TABLE.
ALTER TABLE OperationsOnly ADD and DROP column operations are supported. Columns that are part of the PRIMARY KEY (both k_ and v_ prefixed versions) cannot be modified or dropped.
Explicit Columns in SELECTYou must explicitly list columns in INSERT INTO ... SELECT ... statements. SELECT * is not supported because the source schema includes Kafka metadata (partition, offset, timestamp) that cannot be written.
Topic PartitioningThe catalog does not manage Kafka topic partitioning or replication settings.
Case SensitivityFlink table names must exactly match the corresponding Kafka topic names and Schema Registry subject/schema names, respecting case sensitivity.

Ververica Deployment Workflow Example

This workflow demonstrates using the Confluent Schema Registry Catalog with a Ververica deployment. The steps are similar for the AWS Glue Schema Registry Catalog, differing mainly in the CREATE CATALOG options.

1. Create the Catalog

In the SQL Editor, execute the CREATE CATALOG statement for Confluent SR (as shown previously), providing your Kafka and Schema Registry endpoints. To execute the statement:

  1. Highlight the entire CREATE CATALOG code.
  2. Right-click the highlighted text.
  3. Select Run from the context menu.

After execution, the confluent_sr_catalog will appear in the Catalogs panel on the left, containing a single database named kafka.

2. Create Tables

Ensure confluent_sr_catalog is selected or use USE CATALOG confluent_sr_catalog;. Execute CREATE TABLE statements for the tables you need (e.g., TableName and TableName2 from the examples above) using the same Run method. The new tables will appear under the kafka database in the Catalogs panel.

3. Insert Initial Data

To populate TableName with sample data, execute an INSERT INTO ... VALUES statement. Remember to specify the prefixed column names (k_att1, k_att2, v_att1, etc.). Deploy this SQL as a job from the editor. Since it's a bounded insert, the job will run and finish. You can start the deployment manually from the Deployments panel.

4. Migrate Data Between Tables

Execute the INSERT INTO TableName2 SELECT ... FROM TableName query (using explicit columns and optionally the scan.bounded.mode hint). Deploy this SQL. If using the bounded mode hint, the job will run, copy the data, and finish.

5. Alter a Table Schema

Execute an ALTER TABLE TableName DROP (v_att3); statement to remove a non-key column. After execution, you can inspect the updated schema by clicking the table name in the Catalogs panel. Subsequent reads from the table will reflect this change, respecting the schema evolution policy.

More Information