Skip to main content

Manage Kafka JSON catalogs

After you create a Kafka JSON catalog, you can access JSON-formatted topics of a Kafka cluster in the console without the need to define a schema. This topic describes how to create and use a Kafka JSON catalog.

Background Information

A Kafka JSON catalog automatically parses JSON-formatted messages to infer the schema of a topic. Therefore, you can use a JSON catalog to obtain specific fields of the messages without the need to declare the schema of a Kafka table in Flink SQL. When you use a Kafka JSON catalog, take note of the following points:

  • The name of a table of a Kafka JSON catalog matches the name of a topic of the Kafka cluster. This way, you do not need to execute DDL statements to register the Kafka table to access the topic of the Kafka cluster. This improves the efficiency and accuracy of data development.
  • Tables of Kafka JSON catalogs can be used as source tables in Flink SQL deployments.
  • You can use Kafka JSON catalogs together with the CREATE TABLE AS statement to synchronize schema changes.

Limits

  • Kafka JSON catalogs support only JSON-formatted topics.
  • Only Ververica Cloud with VERA engine vera-1.0.6-flink-1.17, compatible with Flink 1.17 or later, supports Kafka JSON catalogs. The S3 bucket that is used by an Kafka JSON catalog must reside in the same region as Ververica Cloud.
  • You cannot modify the existing Kafka JSON catalogs by executing DDL statements.
  • Tables of Kafka JSON catalogs can be used as source tables in Flink SQL deployments but cannot be used as result tables or lookup tables that are used as dimension tables.

Create a Kafka JSON Catalog

Set up you Kafka cluster on your remote server or your Kafka provider to get started.

  1. Log in to Ververica Cloud.

  2. On the Dashboard page, locate the workspace you want to manage, click the title of the workspace or this icon ⋮, and select Open Console.

  3. In the left-side navigation pane of the Console, click Catalogs.

  4. On the Catalog list page, click Create Catalog.

  5. Create a Kafka JSON catalog connecting to your cluster:

CREATE CATALOG `kafka_catalog` WITH (
'type'='kafka',
'properties.bootstrap.servers'='<kafka_broker1_endpoint>:9092;<kafka_broker2_endpoint>:9092',
'format'='json');

You can now use the catalog and display Kafka topics. Verify by running the below statement to get a list of available tables:

USE CATALOG `kafka_catalog`;
SHOW TABLES;

Alternatively, to display databases run the following:

USE CATALOG `kafka_catalog`;
SHOW DATABASES;

Create Kafka Topics as Tables

You can use the kafka catalog in you DDL statements to create kafka topics as tables. For example, follow the below statement:

CREATE TABLE `kafka_catalog`.`kafka`.`demo`(
userId BIGINT,
title VARCHAR(20)
) WITH (
'connector'='kafka',
'properties.bootstrap.servers'='<kafka_broker1_endpoint>:9092.compute.amazonaws.com:9092',
'format'='json'
);

Schema Discovery

Kafka table schemas are not defined by DDLs but they are discovered from records inserted in the topic using what is called Schema Format Parsing. This function parses the table schema from the records schema. Currently only json formats are enabled for Kafka catalog to be compatible with schema format parsing.

The number of records used for parsing and key or value parsing could be configured on catalog creation, for example:

CREATE TABLE `kafka_catalog`.`kafka`.`demo`(
userId BIGINT,
title VARCHAR(20)
) WITH (
'connector'='kafka',
'properties.bootstrap.servers'='<kafka_broker1_endpoint>:9092.compute.amazonaws.com:9092',
'format'='json',
'max.fetch.records' = 100, // maximum number of records
'key.fields-prefix' = 'key_',
'value.fields-prefix' ='value_'
);

With this configuration, the catalog will consume up to 100 records from the topic to parse the schema.

  • key.fields-prefix is used to append a prefix for columns defined by the record keys
  • value.fields-prefix is used to append a prefix for columns defined by the record values

Kafka Authentication Protocols

Similar to Kafka connector the catalog supports SASL_PLAINTEXT and SCRAM_SASL authentication with the proper configuration:

CREATE CATALOG `kafkaconn` WITH (
'properties.bootstrap.servers'='<kafka_broker_0>:9092',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";',
'format'='json',
'type'='kafka');

The authentication options need to be explicitly passed when reading from the catalog tables.

SELECT * FROM `kafkaconn`.kafka.`test-topic` \
/*+OPTIONS('properties.group.id' = 'group.sensors', \
'properties.auto.offset.reset' = 'earliest', \
'properties.security.protocol' = 'SASL_PLAINTEXT', \
'properties.sasl.mechanism' = 'PLAIN' , \
'properties.sasl.jaas.config' ='org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" password="admin-secret";')*/;

Use a Kafka JSON Catalog

If a table of the Kafka JSON catalog is used as a source table, you can read data from the Kafka topic that matches the table. The following code example demonstrates a SELECT statement from a Kafka catalog.


INSERT INTO ${other_sink_table}
SELECT...
FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}`/*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
info

If you need to specify other parameters in the WITH clause when you use a Kafka JSON catalog, we recommend that you use SQL hints to add other parameters. In the preceding SQL statement, SQL hints are used to specify that the consumption starts from the earliest data. For more information about other parameters, see our Kafka Connector page.

You can synchronize data from the Kafka topic that matches the table to the destination table by using the CREATE TABLE AS statement.

  • Synchronize data from a single topic in real time.
CREATE TABLE IF NOT EXISTS `${target_table_name}`
WITH(...)
AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}`
/*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
  • Synchronize data from multiple topics in a deployment.
BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
AS TABLE `kafka-catalog`.`kafka`.`topic0`
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */;

CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
AS TABLE `kafka-catalog`.`kafka`.`topic1`
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */;

CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
AS TABLE `kafka-catalog`.`kafka`.`topic2`
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */;

END;

You can use the CREATE TABLE AS statement together with Kafka JSON catalogs to synchronize data from multiple Kafka topics in a deployment. To synchronize data from multiple Kafka topics in a deployment, make sure that the following conditions are met:

  • topic-pattern is not configured for all tables that match the topics.
  • The values of Kafka parameters in each table are the same. The values of the parameters whose prefix is properties. are the same. The parameters include properties.bootstrap.servers and properties.group.id.
  • The values of the scan.startup.mode parameter are the same for all the tables. The scan.startup.mode parameter can be set only to group-offsets, latest-offset, or earliest-offset.