Manage Kafka Schema Registry catalog
On this page
Use the Kafka Schema Registry Catalog to integrate Flink SQL with Confluent Schema Registry or AWS Glue Schema Registry. This allows you to manage schemas and use Kafka topics with associated schemas directly as Flink tables.
Background
A schema registry is a centralized repository for managing and validating data schemas used in streaming platforms like Apache Kafka. It ensures data consistency between producers and consumers and enables safe schema evolution. Flink supports integration with both Confluent Schema Registry and AWS Glue Schema Registry through this catalog, primarily for the Avro format.
Prerequisites
- Access to a running Kafka cluster.
- A deployed and accessible schema registry (Confluent or AWS Glue).
- Network connectivity between your Flink environment and the schema registry.
- Appropriate credentials and permissions for the schema registry.
Supported Schema Registries
Catalog Features
The Kafka Schema Registry Catalog enables you to discover, manage, and use schemas registered in Confluent or AWS Glue as Flink tables. This simplifies schema management and enforces compatibility across streaming applications. The underlying logic for both registry types is shared within the catalog, differing mainly in connection details and specific WITH options.
Flink Catalog & Schema Registry Concepts Mapping
The catalog maps Flink concepts to Schema Registry and Kafka concepts as follows:
- In Confluent Schema Registry, schemas are grouped by subject. Flink table names map directly to these subject names (excluding the
-key/-valuesuffix). - In AWS Glue Schema Registry, schemas are identified by IDs within a named registry. You must create the registry in AWS Glue before creating the Flink catalog.
Creating a Catalog
You must first create a catalog linked to your specific schema registry.
Confluent Schema Registry
This statement creates a catalog named confluent_sr_catalog connected to a Confluent Schema Registry.
type = 'kafka': Specifies the catalog type.properties.bootstrap.servers: Your Kafka broker address(es).url: The endpoint of your Confluent Schema Registry service.format = 'avro-confluent': Indicates the use of Confluent's Avro format.key.fields-prefix/value.fields-prefix: Prefixes applied to key/value fields in the Flink table schema to avoid name collisions.
1CREATE CATALOG confluent_sr_catalog WITH (
2 'type' = 'kafka',
3 'properties.bootstrap.servers' = 'kafka.example.com:9092',
4 'url' = '[http://confluent-sr.example.com:8081](http://confluent-sr.example.com:8081)',
5 'format' = 'avro-confluent',
6 'key.fields-prefix' = 'k_',
7 'value.fields-prefix' = 'v_'
8);AWS Glue Schema Registry
This statement creates a catalog named glue_sr_catalog connected to an AWS Glue Schema Registry.
type = 'kafka': Specifies the catalog type.properties.bootstrap.servers: Your Kafka broker address(es).format = 'avro-glue': Indicates the use of AWS Glue's Avro format.aws.glue.region: The AWS region where your registry resides.aws.glue.registry.name: The name you gave your schema registry in AWS Glue.aws.glue.aws.credentials.provider.basic.accesskeyid/secretkey: Explicit AWS credentials (alternatively, use IAM roles or environment variables).key.fields-prefix/value.fields-prefix: Prefixes applied to key/value fields.
1CREATE CATALOG glue_sr_catalog WITH (
2 'type' = 'kafka',
3 'properties.bootstrap.servers' = 'kafka.example.com:9092',
4 'format' = 'avro-glue',
5 'aws.glue.region' = 'us-east-1',
6 'aws.glue.registry.name' = 'my-registry',
7 'aws.glue.aws.credentials.provider.basic.accesskeyid' = '<your-access-key>',
8 'aws.glue.aws.credentials.provider.basic.secretkey' = '<your-secret-key>',
9 'key.fields-prefix' = 'k_',
10 'value.fields-prefix' = 'v_'
11);After creating the catalog, activate it for your current session:
1USE CATALOG confluent_sr_catalog; -- or glue_sr_catalogYou can now manage tables within this catalog.
Catalog Options
The following table summarizes the primary options used when creating a Kafka Schema Registry Catalog.
Creating a Table
When you execute a CREATE TABLE statement within the catalog, Flink automatically registers two schemas in the Schema Registry under corresponding subjects/IDs: one for the key (<table_name>-key) and one for the value (<table_name>-value). The Kafka topic name will match the Flink table name.
Fields designated as PRIMARY KEY in Flink SQL are included in both the key and value schemas. In the resulting Flink table schema, these key fields are duplicated, once with the key.fields-prefix and once with the value.fields-prefix.
1CREATE TABLE TableName (
2 att1 STRING,
3 att2 INT,
4 att3 DOUBLE,
5 PRIMARY KEY (att1, att2) NOT ENFORCED
6);After creation, the Flink table TableName will have columns like v_att1, v_att2, v_att3, k_att1, k_att2.

Querying and Writing Data
All subsequent SELECT, INSERT, and ALTER statements must operate on the table schema that includes the key/value prefixes.
When writing data using INSERT INTO ... SELECT ... FROM, you must specify the explicit column names in your SELECT projection. Using SELECT * is not supported because the source table schema includes Kafka metadata fields (partition, offset, timestamp) that cannot be written to the sink. The typical column order in the Flink table schema places value-prefixed fields before key-prefixed fields. Ensure your SELECT projection matches the target table's column order if you don't explicitly list columns in the INSERT INTO clause.
```sql USE CATALOG confluent_sr_catalog; -- or glue_sr_catalog
-- Create a second table with the same schema CREATE TABLE TableName2 ( att1 STRING, att2 INT, att3 DOUBLE, PRIMARY KEY (att1, att2) NOT ENFORCED );
-- Insert sample data into the first table (using explicit columns) INSERT INTO TableName (k_att1, k_att2, v_att1, v_att2, v_att3) VALUES ('key1', 1, 'key1', 1, 10.5);
-- Copy data from TableName to TableName2 INSERT INTO TableName2 SELECT v_att1, v_att2, v_att3, k_att1, k_att2 -- Explicit projection required FROM TableName /*+ OPTIONS ( 'scan.startup.mode' = 'earliest-offset' ) */;
1
2
3### Using Hints for Connector Options
4
5Specific Kafka connector options (like `scan.startup.mode`, `scan.bounded.mode`) cannot be set at the catalog level, as they would apply too broadly. Instead, use SQL hints `/*+ OPTIONS (...) */` within your query to override these options for that specific query only.
6
7-- Run a bounded query that stops at the latest offset
8INSERT INTO TableName2
9SELECT v_att1, v_att2, v_att3, k_att1, k_att2
10FROM TableName
11/*+ OPTIONS (
12 'scan.startup.mode' = 'earliest-offset',
13 'scan.bounded.mode' = 'latest-offset'
14) */;An INSERT INTO ... SELECT ... FROM query creates an unbounded streaming job by default. Use the 'scan.bounded.mode' = 'latest-offset' hint to run it as a finite batch job for testing.
Modifying a Table Schema
The catalog supports limited ALTER TABLE operations:
- You can
ADDorDROPcolumns using their prefixed names (e.g.,v_new_column). - You cannot modify or drop columns that are part of the
PRIMARY KEY(neither thek_nor thev_prefixed versions).
1-- Add a new value column
2ALTER TABLE TableName ADD (v_new_col STRING);
3
4-- Drop an existing value column (must not be part of the PK)
5ALTER TABLE TableName DROP (v_att3);Limitations
Ververica Deployment Workflow Example
This workflow demonstrates using the Confluent Schema Registry Catalog with a Ververica deployment. The steps are similar for the AWS Glue Schema Registry Catalog, differing mainly in the CREATE CATALOG options.
- Create the Catalog
In the SQL Editor, execute the CREATE CATALOG statement for Confluent SR (as shown previously), providing your Kafka and Schema Registry endpoints. To execute the statement:
- Highlight the entire
CREATE CATALOGcode. - Right-click the highlighted text.
- Select Run from the context menu.
After execution, the confluent_sr_catalog will appear in the Catalogs panel on the left, containing a single database named kafka.
- Create Tables
Ensure confluent_sr_catalog is selected or use USE CATALOG confluent_sr_catalog;. Execute CREATE TABLE statements for the tables you need (e.g., TableName and TableName2 from the examples above) using the same Run method. The new tables will appear under the kafka database in the Catalogs panel.
- Insert Initial Data
To populate TableName with sample data, execute an INSERT INTO ... VALUES statement. Remember to specify the prefixed column names (k_att1, k_att2, v_att1, etc.). Deploy this SQL as a job from the editor. Since it's a bounded insert, the job will run and finish. You can start the deployment manually from the Deployments panel.
- Migrate Data Between Tables
Execute the INSERT INTO TableName2 SELECT ... FROM TableName query (using explicit columns and optionally the scan.bounded.mode hint). Deploy this SQL. If using the bounded mode hint, the job will run, copy the data, and finish.
- Alter a Table Schema
Execute an ALTER TABLE TableName DROP (v_att3); statement to remove a non-key column. After execution, you can inspect the updated schema by clicking the table name in the Catalogs panel. Subsequent reads from the table will reflect this change, respecting the schema evolution policy.
More Information
- Schema Registry for Confluent Platform | Confluent Documentation
- AWS Glue Schema Registry - AWS Glue