Skip to main content

Protocol Buffers (Protobuf) Format

The Protocol Buffers, or Protobuf, format allows you to read and write Protobuf data in Flink SQL, based on the Java classes generated from a .proto definition file.

Usage and Example

To use the Protobuf format, you need to define your schema, generate the corresponding Java classes, package them into a JAR file, and then reference them in your Flink SQL CREATE TABLE statement.

Step 1: Define Your Protobuf Schema

First, create a .proto file to define the structure of your messages.

syntax = "proto2";
package com.example;

option java_package = "com.example";
option java_multiple_files = true;

message SimpleTest {
optional int64 uid = 1;
optional string name = 2;
optional int32 category_type = 3;
optional bytes content = 4;
optional double price = 5;
map<int64, InnerMessageTest> value_map = 6;
repeated InnerMessageTest value_arr = 7;
optional Corpus corpus_int = 8;
optional Corpus corpus_str = 9;

message InnerMessageTest {
optional int64 v1 = 1;
optional int32 v2 = 2;
}

enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 7;
}
}

Step 2: Generate and Package Java Classes

  1. Use the protoc command-line tool to compile your .proto file into Java classes.
  2. Compile and package these generated classes into a JAR file. You do not need to include the proto-java library in your JAR.
  3. Upload this JAR file as an additional dependency to your environment.

Finally, define your Flink SQL table. In the WITH clause, set 'format' = 'protobuf' and specify the full Java class name for your message. The following example uses the Kafka connector.

CREATE TABLE simple_test (
uid BIGINT,
name STRING,
category_type INT,
content BINARY,
price DOUBLE,
value_map MAP<BIGINT, ROW<v1 BIGINT, v2 INT>>,
value_arr ARRAY<ROW<v1 BIGINT, v2 INT>>,
corpus_int INT,
corpus_str STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'protobuf',
'protobuf.message-class-name' = 'com.example.SimpleTest',
'protobuf.ignore-parse-errors' = 'true'
);

Format Options

OptionRequiredDefaultTypeDescription
formatrequired(none)StringSpecify what format to use. Set this to 'protobuf'.
protobuf.message-class-namerequired(none)StringThe full name of a Protobuf generated class, which must match the message name in your .proto file. Use $ for inner class names (e.g., 'com.example.OuterClass$MessageClass').
protobuf.ignore-parse-errorsoptionalfalseBooleanIf true, the format will skip rows that have parsing errors instead of failing the job.
protobuf.read-default-valuesoptionalfalseBooleanIf true, the format reads empty fields as their default values defined in the .proto file. If false, it generates NULL values. Note: For proto3 syntax, you must set this to true when using Protobuf versions older than 3.15 to avoid runtime issues, as older versions cannot check for field presence. This will also cause primitive types to use default values instead of NULL. In addition, setting this to true causes the deserialization performance to be much slower depending on schema complexity and message size.
protobuf.write-null-string-literaloptional""StringWhen serializing data, this option specifies the string literal to use for NULL values within Protobuf arrays or maps.

Data Type Mapping

The following table lists the type mapping from Flink SQL types to Protobuf types.

Flink SQL TypeProtobuf TypeNotes
CHAR / VARCHAR / STRINGstring
BOOLEANbool
BINARY / VARBINARYbytes
INTint32
BIGINTint64
FLOATfloat
DOUBLEdouble
ARRAYrepeatedElements cannot be NULL. The string literal for NULL values can be specified by protobuf.write-null-string-literal.
MAPmapKeys and values cannot be NULL. The string literal for NULL values can be specified by protobuf.write-null-string-literal.
ROWmessage
VARCHAR/CHAR/TINYINT/SMALLINT/INTEGER/BIGINTenumA Protobuf enum can be mapped to either a STRING (for the name) or a number (INT, BIGINT, etc.) in Flink SQL.
ROW<seconds BIGINT, nanos INT>google.protobuf.timestampThe google.protobuf.timestamp type can be mapped to a Flink ROW type containing seconds and nanoseconds in UTC epoch time.

Special Considerations

Protobuf has specific rules for handling concepts like null values and oneof fields. This section explains how the Flink format addresses these situations during serialization and deserialization.

Null Values

Protobuf does not permit NULL values in maps and arrays. When converting from Flink ROW data to Protobuf, NULL values are automatically converted to the following default values.

Protobuf Data TypeDefault Value
int32 / int64 / float / double0
string"" (empty string)
boolfalse
enumThe first defined enum value
binaryByteString.EMPTY
messageMESSAGE.getDefaultInstance()

OneOf Fields

During serialization, Flink does not guarantee that only one field within a oneof group contains a valid value. Flink processes and sets each field in the order they appear in the schema. As a result, if multiple fields in the same oneof group have values, the field that appears later in the schema will overwrite any previously set fields in that group.

For more detailed information about Protobuf types, you can refer to the official Language Guide (proto2) or Language Guide (proto3).