Skip to main content
Version: 2.14

Connectors

Flink SQL reads data from and writes data to external storage systems, as for example Apache Kafka® or a file system. Depending on the external system, the data can be encoded in different formats, such as Apache Avro® or JSON. Flink uses connectors to communicate with the storage systems and to encode and decode table data in different formats.

Each table that is read or written with Flink SQL requires a connector specification. The connector of a table is specified and configured in the DDL statement that defines the table.

Ververica Platform comes with a selection of packaged connectors and formats. Custom connectors and formats can be registered in the Platform to enable reading data from or writing data to systems which are not supported by packaged connectors and format.

This page discusses all packaged connectors and formats that come with Ververica Platform, explains how to register custom connectors and formats, and how to manage packaged connectors and formats.

Connector Types

Flink SQL knows four different types of connectors.

Bounded Source A bounded source connector reads table updates from a bounded data set. Once all updates are read and forwarded, the table backed by the connector becomes static and does not change anymore.

Unbounded Source An unbounded source connector reads table updates from an infinite data stream. Tables backed by an unbounded source connector are continuously updated until a query is terminated.

Lookup Source A lookup source connector is always used in combination with a lookup join. The connector performs a key-lookup in the data backed by the external system and returns the result.

Sink A sink connector writes table updates to an external system.

Packaged Connectors

NameUsageSupported Formats
Apache Kafka®Unbounded Source / SinkApache Avro®, Apache Avro® for Confluent® Schema Registry, CSV, JSON, Debezium-Avro® for Confluent® Schema Registry, Debezium-JSON, Canal-JSON, Raw
Upsert Apache Kafka®Unbounded Source / SinkApache Avro®, Apache Avro® for Confluent® Schema Registry, CSV, JSON, Raw
Amazon KinesisSink / SourceJSON, CSV, Apache Avro®, Raw
File SystemSink / SourceApache Avro®, CSV, JSON, Apache ORC®, Apache Parquet®, Debezium-JSON, Canal-JSON, Raw
JDBCBounded Source / Sink / LookupNot Applicable
Elasticsearch® 6SinkJSON
Elasticsearch® 7SinkJSON
Data GeneratorUnbounded SourceNot Applicable
PrintSinkNot Applicable
BlackholeSinkNot Applicable

Apache Kafka®

The Kafka connector allows for reading and writing data to and from Apache Kafka® topics.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)

See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns.

Upsert Kafka®

The Upsert Kafka connector allows for reading and writing data to and from compacted Apache Kafka® topics. A table backed by the upsert-kafka connector must define a PRIMARY KEY. The connector uses the table's primary key as key for the Kafka topic on which it performs upsert writes.

CREATE TABLE CustomerOrderStats (
cust_id BIGINT,
order_cnt BIGINT,
order_price_sum DECIMAL(32,2),
last_order_time TIMESTAMP(3),
PRIMARY KEY (cust_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'customer-order-stats',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'customer-order-updater',
'key.format' = 'avro',
'value.format' = 'avro'
)

See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns.

Kinesis

The Kinesis connector allows reading from and writing to Amazon Kinesis Data Streams (KDS).

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
)
PARTITIONED BY (order_id)
WITH (
'connector' = 'kinesis',
'stream' = 'orders',
'aws.region' = 'us-east-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'avro'
);

See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns.

File System

The file system sink connector writes table rows to files in a supported and configured file system, such as HDFS or S3. The connector supports row-encoded and bulk-encoded formats. Row-encoded formats such as CSV and JSON write each row individually to a file. Bulk-encoded formats collect a batch of rows in memory and organize them in a storage and scan efficient format before writing the data to a file. Examples for bulk-encoded formats are Apache Avro®v, Apache ORC®, and Apache Parquet®.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'filesystem',
'path' = 's3a://my-bucket/tables/my-table',
'format' = 'csv'
)

The file system sink connector supports rolling files and partitioned tables. You can also configure commit policies to signal downstream systems when a partition has been completed. The section on Customizing a Packaged Connector explains how to add a custom PartitionCommitPolicy to the connector.

See the official Apache Flink® documentation for a detailed description of these features and a full list of supported configuration options.

note

Access credentials for the file system can be passed via the Flink configuration of the deployment.

note

When reading or writing tables on HDFS, the Apache Hadoop® dependencies need to be linked to the classpath of the Session Cluster or Deployment via

spec:
kubernetes:
pods:
envVars:
- name: VVP_EXTRA_CLASSPATH
value: /flink/opt/flink-shaded-hadoop/*

JDBC

The JDBC connector reads rows from or writes rows to a table in a JDBC database. The source connector operates in bounded mode. That means a table is read when a query is started and is never updated while the query is running. The sink connector continuously writes table updates to a table.

The database connection for each query is configured in the WITH clause of the CREATE TABLE statement for the query. The configuration is defined by simple 'key' = 'value' pairs.

For example, the following configuration specifies the JDBC connector, the MariaDB driver, and a MariaDB instance URL:

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'driver' = 'org.mariadb.jdbc.Driver',
'url' = 'jdbc:mysql://mysqlhost:3306/mydatabase',
'table-name' = 'orders'
)

Out of the box using the JDBC connector, Ververica Platform supports MariaDB, MySQL 5.5.3+, PostgresSQL 8.2+, Apache Derby, Microsoft MSSQL, and Oracle. All the required drivers are included in the platform image, so zero additional configuration is required.

NameDriver String
MariaDBorg.mariadb.jdbc.Driver
MySQL 5.5.3+org.mariadb.jdbc.Driver
PostgresSQL 8.2+org.postgresql.Driver
Apache Derbyorg.apache.derby.jdbc.ClientDriver
MSSQLcom.microsoft.sqlserver.jdbc.SQLServerDriver
Oracleoracle.jdbc.OracleDriver

The JDBC connector also supports key lookups and can therefore be used for lookup joins to enrich tables with data stored in a JDBC table.

note

To use the JDBC connector as a lookup table source, you must ensure that the lookup queries against the table can be efficiently processed, for example by creating an appropriate index. Add an 'index' = '<index-name>' pair to the database configuration in the WITH clause, for example:

CREATE TABLE Orders (
...
) WITH (
'connector' = 'jdbc',
...
'index' = 'orders'
)

For a detailed description of the JDBC Connector, see the official Apache Flink® documentation Connector features and options pages.

note

You can add a JDBC driver for any other DBMS by adding a JAR file with a matching JDBC driver to the connector as described in Customizing a Packaged Connector.

Elasticsearch® 6 and 7+

Ververica Platform includes two sink connectors to write tables to Elasticsearch® indexes. Because Elasticsearch 7 introduced a new client API, we provide the connectors elasticsearch-6 and elasticsearch-7 which support Elasticsearch 6 and Elasticsearch 7 including subsequent versions.

-- create Orders table backed by Elasticsearch 6
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://es_host:9092',
'index' = 'orders',
'document-type' = 'order'
)
-- create Orders table backed by Elasticsearch 7+
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://es_host:9092',
'index' = 'orders'
)

See the official Apache Flink® documentation for a detailed description of the connector's features and a full list of supported configuration options.

Data Generator

The data generator connector allows developers to create a table backed by an in-memory data-generator. It allows for mocking out data sources during the early development of a query. When used in conjunction with computed columns, this source allows for a flexible generation of complex types.

CREATE TABLE Orders (
order_id BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen'
)

Often, data generators are defined based on physical tables using a LIKE clause.

CREATE TABLE Orders (
order_id BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)

CREATE TABLE MockOrders WITH (
'connector' = 'datagen'
) LIKE Orders (EXCLUDING ALL)

See the official Apache Flink® documentation for a full list of supported configurations.

note

Please note that we are already shipping a version of the datagen connector in Ververica Platform since 2.3.0 that includes FLINK-18735, which is why we are linking to the snapshot documentation of Apache Flink® above instead of the one of Apache Flink® 1.11. With FLINK-18735 the datagen connector supports basically all primitive datatypes as well as composite types like ROW and ARRAY.

The print connector allows developers to create a table that will print all rows to standard out of the executing TaskManager(s).

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2)
) WITH (
'connector' = 'print'
)

Often, print tables are defined based on physical tables using a LIKE clause.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)

CREATE TABLE PrintOrders
WITH (
'connector' = 'print'
) LIKE Orders (EXCLUDING ALL)

See the official Apache Flink® documentation for a full list of supported configurations.

BlackHole

The blackhole connector allows developers to create a table that swallows all input records. Consider it the /dev/null of Flink SQL.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2)
) WITH (
'connector' = 'blackhole'
)

Often, blackhole tables are defined based on physical tables using a LIKE clause.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)

CREATE TABLE BlackHoleOrders
WITH (
'connector' = 'blackhole'
) LIKE Orders (EXCLUDING ALL)

Packaged Formats

Apache Avro®

The Avro format allows reading and writing Apache Avro® data based on an Avro schema.

{
"type": "record",
"fields" : [
{"name": "order_id", "type": "long"},
{"name": "order_time", "type": "long", "logicalType": "timestamp-millis"},
{"name": "price", "type": "bytes", "logicalType": "decimal", "precision": 32, "scale": 2},
{"name": "buyer", "type": {
"type": "record",
"fields": [
{"name": "first_name", "type": "string"},
{"name": "last_name", "type": "string"},
{"name": "title", "type": "string"}
]
}}
]
}

The schema is derived from the table schema, so the above example schema would correspond to the below table definition.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
buyer ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'avro'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

Apache Avro® (Confluent®)

The Avro format allows reading and writing Apache Avro records in line with io.confluent.kafka.serializers.(De)Serializer.

When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured Confluent Schema Registry based on the schema version id encoded in the record while the reader schema is inferred from the table schema.

When writing (serializing) a record with this format the Avro schema is inferred from the table schema and registered in the configured Confluent Schema Registry under the subject given in avro-confluent.schema-registry.subject.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
buyer ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://localhost:8081'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

Debezium-Avro® (Confluent®)

The debezium-avro-confluent format allows reading and writing Debezium changelog streams that are encoded in Apache Avro. Debezium is a Changelog Data Capture (CDC) format to encode database updates. It is compatible with MySQL, PostgreSQL, Oracle, Microsoft SQL Server, and many other databases. The Avro schema is maintained in Confluent® Schema Registry.

When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured Confluent Schema Registry based on the schema version id encoded in the record while the reader schema is inferred from the table schema.

When writing (serializing) a record with this format the Avro schema is inferred from the table schema and registered in the configured Confluent Schema Registry under the subject given in debezium-avro-confluent.schema-registry.subject.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
buyer ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://localhost:8081'
)

See the official Apache Flink® documentation for a full list of properties and metadata columns. The data type mappings are the same of those for Avro.

CSV

The CSV format allows reading and writing CSV data based on a CSV schema.

10,"2020-12-30 12:13:14.123",2.10,10

The schema is derived from the table schema, so the above example record would correspond to the below table definition.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

JSON

The JSON format allows reading and writing JSON data based on a JSON schema.

{
"order_id": 10,
"order_time": "2020-12-30 12:13:14.123",
"price": 2.10,
"buyer": {
"first_name": "Elias",
"last_name": "Williamson",
"title": "Ms."
}
}

The schema is derived from the table schema, so the above example record would correspond to the below table definition.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
buyer ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'json'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

Debezium-JSON

Debezium is a Changelog Data Capture (CDC) tool that streams changes in real-time from MySQL, PostgreSQL, Oracle, Microsoft SQL Server, and many other databases into Apache Kafka®. It provides a unified format schema for changelog and supports to serialize messages using JSON.

Apache Flink® supports reading and writing Debezium INSERT/UPDATE/DELETE messages. The debezium-json format can be used to:

  • Synchronize changes of database tables to other systems
  • Read or write auditing logs
  • Maintain query results as real-time materialized views in databases
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'debezium-json'
)

See the official Apache Flink® documentation for a full list of properties and metadata columns. The data type mappings are the same of those for JSON.

Canal-JSON

Canal is a Change Data Capture (CDC) tool that can stream changes from MySQL into other systems. It provides a unified format schema for changelog and supports serializing messages using JSON.

Apache Flink® supports reading and writing Canal INSERT/UPDATE/DELETE messages. The canal-json format can be used to:

  • Synchronize changes of database tables to other systems
  • Read or write auditing logs
  • Maintain query results as real-time materialized views in databases
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'canal-json'
)

See the official Apache Flink® documentation for a full list of properties. The data type mappings are the same of those for JSON.

Apache ORC®

Apache ORC is a binary columnar storage format for structured data. Columnar formats can achieve significantly better scan performance for most analytical workloads due to lower IO costs and better compression.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'filesystem',
'path' = 's3a://my-bucket/tables/my-table',
'format' = 'orc'
)

See the official Apache Flink® documentation for a full list of configuration options and data type mappings.

Apache Parquet®

Apache Parquet® is a binary columnar storage format for structured data. Columnar formats can achieve significantly better scan performance for most analytical workloads due to lower IO costs and better compression.

CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'filesystem',
'path' = 's3a://my-bucket/tables/my-table',
'format' = 'parquet'
)

See the official Apache Flink® documentation for a full list of configuration options and data type mappings.

Raw

The raw format allows to read and write a single column as raw bytes. Only simple types as listed in the data type mappings section of the Flink documentation are supported.

CREATE TABLE StringVals (
value STRING
) WITH (
'connector' = 'kafka',
'topic' = 'string-vals',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test',
'format' = 'raw',
'raw.charset' = 'UTF-8'
)

See the official Apache Flink® documentation for a full list of configuration options.

Protobuf

Protobuf is a format for Table API Connectors.

The following demonstrates the syntax and configuration options:

syntax = "proto3";
package com.example;
option java_package = "com.example";
option java_multiple_files = true;

message SimpleTest {
optional string name = 1;
}

Use protoc command to compile the .proto file to java classes:

protoc --proto_path=<path-to-protobuf-files> --java_out=<path-for-generated-java-classes> *.proto

The compile and package the classes (there is no need to package proto-java into the jar).

Below is a CREATE TABLE example:

CREATE TABLE `vvp`.`default`.`protobuf_kafka` (
`name` VARCHAR(2147483647)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'protobuf',
'properties.bootstrap.servers' = 'kafka.kafka.svc.cluster.local:9092',
'properties.group.id' = 'protobuf-behaviour-1',
'protobuf.ignore-parse-errors' = 'true',
'protobuf.message-class-name' = 'com.example.SimpleTest',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'protobuf-behaviour-1'
);

The following example shows how to create a deployment:

apiVersion: v1
kind: Deployment
metadata:
displayName: pklo-protobuf-producer
name: pklo-protobuf-producer
namespace: default
spec:
deploymentTargetId: null
deploymentTargetName: vvp-pklo-jobs
maxJobCreationAttempts: 4
maxSavepointCreationAttempts: 4
restoreStrategy:
allowNonRestoredState: false
kind: LATEST_STATE
sessionClusterName: null
state: CANCELLED
template:
metadata:
annotations:
flink.queryable-state.enabled: 'false'
flink.security.ssl.enabled: 'false'
spec:
artifact:
flinkImageRegistry: eu.gcr.io/vvp-devel-240810
flinkImageRepository: flink
flinkImageTag: 1.18.0-stream1-scala_2.12-java8
flinkVersion: '1.18'
kind: SQLSCRIPT
sqlScript: >-
ADD JAR
's3://vvp-snapshot-blob-storage-eu-west-1/vvp-pklo/artifacts/namespaces/default/udfs/simpletest/2023-12-13T08-32-12.138190Z/simpletest.jar';

CREATE TEMPORARY TABLE `protobuf_input_data` (
`name` VARCHAR(2147483647)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);

INSERT INTO protobuf_kafka

SELECT * from protobuf_input_data;
flinkConfiguration:
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 10s
execution.checkpointing.min-pause: 10s
taskmanager.numberOfTaskSlots: '1'
web.cancel.enable: 'false'
logging:
log4jLoggers:
'': INFO
loggingProfile: default
parallelism: 1
resources:
jobmanager:
cpu: 1
memory: 1G
taskmanager:
cpu: 1
memory: 2G
upgradeStrategy:
kind: STATEFUL

See the official Apache Flink® documentation for a full list of configuration options.

Custom Connectors and Formats

If you need to read data from or write data to a system for which Ververica Platform does not provide a packaged connector or need to read or write data in a format that is not supported by Ververica Platform by default, you can register a custom connector or format.

Custom connectors and formats are namespaced resources. While packaged connectors and formats can be used in every namespace, custom connectors and formats can only be used in the namespace in which they were created.

A custom connector or format is defined by one or more JAR files that include all dependencies which are required to translate and execute a query that references the connector or format. See Developing a Custom Connector or Format for information on how to implement your own custom connector or format. The JAR files of a custom connector or format must be located in Universal Blob Storage or at HTTP locations which are accessible by Ververica Platform.

Every connector and format, regardless of packaged or custom, has a unique type that is defined by the connector's or format's implementation of the factory class (see this link). The type of a connector or format is referenced in the WITH clause of a CREATE TABLE statement. Moreover, the type of a connector or format determines the name of the corresponding connector or format resource. For example, a connector with type myconn would be used in a DDL statement as follows

CREATE TABLE Person (
name STRING
) WITH (
'connector' = 'myconn'
)

and the name of the connector resource in the namespace prod would be namespaces/prod/connectors/myconn.

Since connector and format types must be unique, it is not possible to add a custom connector or format if there exists another (packaged or custom) connector or format with the same type.

note

In case a Ververica Platform upgrade adds a new packaged connector that has the same type as a previously existing custom connector, the custom connector will shadow the new packaged connector, such that the behavior of existing queries that use the custom connector is not changed.

You can create, update, and delete custom connectors or formats using the corresponding REST endpoints or via the "Connectors" section of the Ververica Platform UI. The UI supports uploading JAR files to Universal Blob Storage and extracting all available connectors and formats from the uploaded files (including all their exposed configuration options).

Developing a Custom Connector or Format

The Apache Flink® documentation describes in detail how to implement a custom source, sink, or format connector for Flink SQL.

note

Ververica Platform only supports connectors based on DynamicTableSource and DynamicTableSink as described in documentation linked above.

Manage Packaged Connectors and Formats

You might want to add, remove, or modify a packaged connector or format. A packaged connector or format consists of:

  • A YAML metadata file that provides the platform with structured information about the connector (or format).
  • The JAR files that include all dependencies which are required to translate and execute a query. When a query is executed, the JAR files of all connectors and formats used by the query are uploaded to Universal Blob Storage and downloaded by the Flink cluster.

These files are located in the Ververica Platform Gateway Docker image in a dedicated directory for each connector (or format) under /vvp/sql/opt/connectors (or /vvp/sql/opt/formats). For example, the /vvp/sql/opt/connectors directory of the Ververica Platform Gateway Docker image looks as follows:

tree /vvp/sql/opt/connectors
/vvp/sql/opt/connectors/
├── blackhole
│   └── connector-meta.yaml
├── datagen
│   └── connector-meta.yaml
├── elasticsearch-6
│   ├── connector-meta.yaml
│   └── flink-sql-connector-elasticsearch6_2.12-1.11.2-stream2.jar
├── elasticsearch-7
│   ├── connector-meta.yaml
│   └── flink-sql-connector-elasticsearch7_2.12-1.11.2-stream2.jar
├── filesystem
│   ├── connector-meta.yaml
│   └── flink-connector-filesystem_2.12-1.11.2-stream2.jar
├── jdbc
│   ├── connector-meta.yaml
│   ├── flink-connector-jdbc_2.12-1.11.2-stream2.jar
│   ├── mariadb-java-client-2.6.2.jar
│   └── postgresql-42.2.16.jar
├── kafka
│   ├── connector-meta.yaml
│   ├── flink-connector-kafka_2.12-1.11.2-stream2.jar
│   ├── flink-connector-kafka-base_2.12-1.11.2-stream2.jar
│   └── kafka-clients-2.4.1.jar
└── print
└── connector-meta.yaml

You can add a packaged connector or format by patching the Ververica Platform Gateway Docker image. In order to add a new connector or format you need to create a new directory under /vvp/sql/opt/connectors or /vvp/sql/opt/formats and add the metadata file and the JAR files with all required dependencies.

Since a connector is completely defined by its metadata file and JAR files, you can also disable or override a connector by removing the respective folder or replacing its content.

note

Packaged connectors and formats are not managed per namespace. Therefore, any change of a packaged connector or format affects all namespaces.

In the following we describe the metadata files and show an example Dockerfile to patch the Gateway Docker image.

Connector Metadata File

The metadata file of a connector must be named connector-meta.yaml and has the following layout:

connector:
type: # the name of the connector type, such as "jdbc" for the JDBC connector. Must be the same as the folder name.
packaged: # a flag to indicate a packaged or custom connector.
source: # a flag to indicate that the connector can be used as a source connector.
sink: # a flag to indicate that the connector can be used as a sink connector.
lookup: # a flag to indicate that the connector can be used as a lookup source connector.
properties: # a list of configuration options. This information is used to improve the user experience when creating tables in the UI.
- key: # the key of the option, such as 'connector' for the connector type.
required: # a flag to indicate that this is a required configuration option.
defaultValue: # the default value of the configuration option.
definesFormat: # true if this property defines a format used by the connector
description: # a free text description of the configuration option.
- ...
supportedFormats: # a list of all formats, such as 'json', that are supported by the connector.
- ...

You can have a look at the metadata files of the packaged connectors as an example for how they are specified.

Format Metadata File

The metadata file of a format must be named format-meta.yaml and has the following layout:

format:
type: # the name of the format type, such as "csv" for the CSV format. Must be the same as the folder name.
packaged: # a flag to indicate a packaged or custom format.
source: # a flag to indicate that the format can be used as a source format to read rows.
sink: # a flag to indicate that the format can be used as a sink format to write rows.
properties: # a list of configuration options. This information is used to improve the ease the definition of formats in the UI.
- key: # the key of the option, such as 'format' for the format type.
required: # a flag to indicate that this is a required configuration option.
defaultValue: # the default value of the configuration option.
description: # a free text description of the configuration option.

You can have a look at the metadata files of the packaged formats as an example for how they are specified.

Patching the Gateway Docker Image

The following example shows the Dockerfile to add a new connector called my-connector to the VVP Gateway Docker image.

FROM registry.ververica.com/v2.10/vvp-gateway:2.10.3
COPY connector-meta.yaml /vvp/sql/opt/connectors/my-connector/connector-meta.yaml
COPY my-connector-0.1.jar /vvp/sql/opt/connectors/my-connector/my-connector-0.1.jar

Customizing a Packaged Connector

Some connectors support customizing their behavior by adding the implementation of a public interface to their classpath and referencing this class in a configuration option. For example JDBC connector features the 'driver' property to specify the JDBC Driver class.

Customizing a connector or format works similar to adding a packaged connector or format. You need to patch the Ververica Platform Gateway Docker image and add a JAR file that contains your custom classes to the directory of the connector or format you would like to customize.