Connectors

Flink SQL reads data from and writes data to external storage systems, as for example Apache Kafka® or a file system. Depending on the external system, the data can be encoded in different formats, such as Apache Avro® or JSON. Flink uses connectors to communicate with the storage systems and to encode and decode table data in different formats.

Each table that is read or written with Flink SQL requires a connector specification. The connector of a table is specified and configured in the DDL statement that defines the table.

Ververica Platform comes with a selection of packaged connectors and formats. Custom connectors and formats can be registered in the Platform to enable reading data from or writing data to systems which are not supported by packaged connectors and format.

This page discusses all packaged connectors and formats that come with Ververica Platform, explains how to register custom connectors and formats, and how to manage packaged connectors and formats.

Connector Types

Flink SQL knows four different types of connectors.

Bounded Source
A bounded source connector reads table updates from a bounded data set. Once all updates are read and forwarded, the table backed by the connector becomes static and does not change anymore.
Unbounded Source
An unbounded source connector reads table updates from an infinite data stream. Tables backed by an unbounded source connector are continuously updated until a query is terminated.
Lookup Source
A lookup source connector is always used in combination with a lookup join. The connector performs a key-lookup in the data backed by the external system and returns the result.
Sink
A sink connector writes table updates to an external system.

Packaged Connectors

Name Usage Supported Formats
Apache Kafka® Unbounded Source / Sink
Upsert Apache Kafka® Unbounded Source / Sink
Amazon Kinesis Sink / Source
File System Sink / Source
Apache Hive® Bounded & Unbounded Source / Sink / Lookup Text, SequenceFile, CSV, Apache ORC®, Apache Parquet®
JDBC Bounded Source / Sink / Lookup Not Applicable
Elasticsearch® 6 Sink JSON
Elasticsearch® 7 Sink JSON
Data Generator Unbounded Source Not Applicable
Print Sink Not Applicable
Blackhole Sink Not Applicable

Apache Kafka®

The Kafka connector allows for reading and writing data to and from Apache Kafka® topics.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns.

Upsert Kafka®

The Upsert Kafka connector allows for reading and writing data to and from compacted Apache Kafka® topics. A table backed by the upsert-kafka connector must define a PRIMARY KEY. The connector uses the table’s primary key as key for the Kafka topic on which it performs upsert writes.

CREATE TABLE CustomerOrderStats (
    cust_id         BIGINT,
    order_cnt       BIGINT,
    order_price_sum DECIMAL(32,2),
    last_order_time TIMESTAMP(3),
    PRIMARY KEY (cust_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'customer-order-stats',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'customer-order-updater',
    'key.format' = 'avro',
    'value.format' = 'avro'
)

See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns.

Kinesis

The Kinesis connector allows reading from and writing to Amazon Kinesis Data Streams (KDS).

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    PRIMARY KEY (order_id) NOT ENFORCED
)
PARTITIONED BY (order_id)
WITH (
  'connector' = 'kinesis',
  'stream' = 'orders',
  'aws.region' = 'us-east-2',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'avro'
);

See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns.

File System

The file system sink connector writes table rows to files in a supported and configured file system, such as HDFS or S3. The connector supports row-encoded and bulk-encoded formats. Row-encoded formats such as CSV and JSON write each row individually to a file. Bulk-encoded formats collect a batch of rows in memory and organize them in a storage and scan efficient format before writing the data to a file. Examples for bulk-encoded formats are Apache Avro®, Apache ORC®, and Apache Parquet®.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'filesystem',
  'path' = 's3a://my-bucket/tables/my-table',
  'format' = 'csv'
)

The file system sink connector supports rolling files and partitioned tables. You can also configure commit policies to signal downstream systems when a partition has been completed. The section on Customizing Packaged Connectors explains how to add a custom PartitionCommitPolicy to the connector.

See the official Apache Flink® documentation for a detailed description of these features and a full list of supported configuration options.

Note

Access credentials for the file system can be passed via the Flink configuration of the deployment.

Note

When reading or writing tables on HDFS, the Hadoop dependencies need to be linked to the classpath of the Session Cluster or Deployment via

spec:
  kubernetes:
    pods:
       envVars:
         - name: VVP_EXTRA_CLASSPATH
           value: /flink/opt/flink-shaded-hadoop/*

Apache Hive®

The Hive connector is used to access all tables that are provided by the Apache Hive® Catalog. It supports bound and unbounded reading from and writing to tables stored in Hive. Please see the Apache Hive® Catalog documentation for how to connect Ververica Platform with Hive Metastore.

Ververica Platform supports Amazon S3 (via Hadoop’s S3 filesystem implementation s3a) as well as Apache Hadoop® HDFS as Hive storage layer out-of-the-box. If you are using a different storage layer, please reach out.

Note

When reading a non-partitioned table backed by HDFS, the Hadoop dependencies need to be linked to the classpath of the Session Cluster or Deployment via

spec:
  kubernetes:
    pods:
       envVars:
         - name: VVP_EXTRA_CLASSPATH
           value: /flink/opt/flink-shaded-hadoop/*

Since the Hive Catalog is read-only in Ververica Platform, additional Flink-specific options need to be added via dynamic table options.

The source connector can operate in bounded as well as unbounded mode. It supports partition pruning, projection and limit pushdown.

-- unbounded
INSERT INTO ..
SELECT ...
FROM `my-hive-catalog`.`default`.`my-hive-table`
/*+ OPTIONS('streaming-source.enable'='true') */

-- bounded
INSERT INTO ..
SELECT ...
FROM `my-hive-catalog`.`default`.`my-hive-table`

See the official Apache Flink® documentation for a list of all options for streaming from Hive.

The sink connector is configured via dynamic table options, too. Its implementation is based on the File System Connector.

INSERT INTO `my-hive-catalog`.`default`.`my-hive-table`
/*+ OPTIONS(
    'sink.partition-commit.policy.kind'          = 'metastore,success-file',
    'sink.partition-commit.trigger'              = 'partition-time',
    'partition.time-extractor.timestamp-pattern' = '$dt $hr:00:00',
    'sink.partition-commit.delay'                = '5 min') */
SELECT ...
FROM ...

See the official Apache Flink® documentation for a list of all options for streaming to Hive.

Please see the Flink documentation for a mapping between Apache Hive® and Apache Flink® data types.

The connector also supports key lookups and can therefore be used for lookup joins to enrich tables with data stored in a Hive table. See the official flink documentation for a list of all options for Hive tables as temporal tables.

JDBC

The JDBC connector reads rows from or writes rows to a table in a JDBC database. The source connector operates in bounded mode. That means a table is read when a query is started and is never updated while the query is running. The sink connector continuously writes table updates to a table. The connector also supports key lookups and can therefore be used for lookup joins to enrich tables with data stored in a JDBC table.

Note

If you want to use the JDBC connector as a lookup table source, you should ensure that the lookup queries against the table can be efficiently processed, for example by creating an appropriate index.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'driver' = 'org.mariadb.jdbc.Driver',
  'url' = 'jdbc:mysql://mysqlhost:3306/mydatabase',
  'table-name' = 'orders'
)

See the official Apache Flink® documentation for a detailed description of the JDBC connector’s features and a full list of supported configuration options.

Note

Ververica Platform supports MariaDB, MySQL 5.5.3+, and PostgresSQL 8.2+ out of the box. You can add a JDBC driver for any other DBMS by adding a JAR file with a matching JDBC driver to the connector as described by Customizing Packaged Connectors.

Elasticsearch® 6 and 7+

Ververica Platform includes two sink connectors to write tables to Elasticsearch® indexes. Because Elasticsearch 7 introduced a new client API, we provide the connectors elasticsearch-6 and elasticsearch-7 which support Elasticsearch 6 and Elasticsearch 7 including subsequent versions.

-- create Orders table backed by Elasticsearch 6
CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-6',
    'hosts' = 'http://es_host:9092',
    'index' = 'orders',
    'document-type' = 'order'
)
-- create Orders table backed by Elasticsearch 7+
CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://es_host:9092',
    'index' = 'orders'
)

See the official Apache Flink® documentation for a detailed description of the connector’s features and a full list of supported configuration options.

Data Generator

The data generator connector allows developers to create a table backed by an in-memory data-generator. It allows for mocking out data sources during the early development of a query. When used in conjunction with computed columns, this source allows for a flexible generation of complex types

CREATE TABLE Orders (
    order_id BIGINT,
    price        DECIMAL(32,2),
    buyer        ROW<first_name STRING, last_name STRING>,
    order_time   TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'datagen'
)

Often, data generators are defined based on physical tables using a LIKE clause.

CREATE TABLE Orders (
    order_id BIGINT,
    price        DECIMAL(32,2),
    buyer        ROW<first_name STRING, last_name STRING>,
    order_time   TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

CREATE TABLE MockOrders WITH (
    'connector' = 'datagen'
) LIKE Orders (EXCLUDING ALL)

See the official Apache Flink® documentation for a full list of supported configurations.

Note

Please note that we are already shipping a version of the datagen connector in Ververica Platform 2.3.0 that includes FLINK-18735, which is why we are linking to the snapshot documentation of Apache Flink® above instead of the one of Apache Flink® 1.11. With FLINK-18735 the datagen connector supports basically all primitive datatypes as well as composite types like ROW and ARRAY.

BlackHole

The blackhole connector allows developers to create a table that swallows all input records. Consider it the /dev/null of Flink SQL.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2)
) WITH (
    'connector' = 'blackhole'
)

Often, blackhole tables are defined based on physical tables using a LIKE clause.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

CREATE TABLE BlackHoleOrders
WITH (
    'connector' = 'blackhole'
) LIKE Orders (EXCLUDING ALL)

Packaged Formats

Apache Avro®

The Avro format allows reading and writing Apache Avro® data based on an Avro schema.

{
   "type": "record",
   "fields" : [
      {"name": "order_id", "type": "long"},
      {"name": "order_time", "type": "long", "logicalType": "timestamp-millis"},
      {"name": "price", "type": "bytes", "logicalType": "decimal", "precision": 32, "scale": 2},
      {"name": "buyer", "type": {
         "type": "record",
         "fields": [
            {"name": "first_name", "type": "string"},
            {"name": "last_name", "type": "string"},
            {"name": "title", "type": "string"}
         ]
      }}
  ]
}

The schema is derived from the table schema, so the above example schema would correspond to the below table definition.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    buyer      ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'avro'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

Apache Avro® (Confluent®)

The Avro format allows reading and writing Apache Avro records in line with io.confluent.kafka.serializers.(De)Serializer.

When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured Confluent Schema Registry based on the schema version id encoded in the record while the reader schema is inferred from the table schema.

When writing (serializing) a record with this format the Avro schema is inferred from the table schema and registered in the configured Confluent Schema Registry under the subject given in avro-confluent.schema-registry.subject.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    buyer      ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'avro-confluent',
    'avro-confluent.schema-registry.url' = 'http://localhost:8081'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

Debezium-Avro® (Confluent®)

The debezium-avro-confluent format allows reading and writing Debezium changelog streams that are encoded in Apache Avro. Debezium is a Changelog Data Capture (CDC) format to encode database updates. It is compatible with MySQL, PostgreSQL, Oracle, Microsoft SQL Server, and many other databases. The Avro schema is maintained in Confluent® Schema Registry.

When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured Confluent Schema Registry based on the schema version id encoded in the record while the reader schema is inferred from the table schema.

When writing (serializing) a record with this format the Avro schema is inferred from the table schema and registered in the configured Confluent Schema Registry under the subject given in debezium-avro-confluent.schema-registry.subject.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    buyer      ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'debezium-avro-confluent',
    'debezium-avro-confluent.schema-registry.url' = 'http://localhost:8081'
)

See the official Apache Flink® documentation for a full list of properties and metadata columns. The data type mappings are the same of those for Avro.

CSV

The CSV format allows reading and writing CSV data based on a CSV schema.

10,"2020-12-30 12:13:14.123",2.10,10

The schema is derived from the table schema, so the above example record would correspond to the below table definition.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

JSON

The JSON format allows reading and writing JSON data based on a JSON schema.

{
   "order_id": 10,
   "order_time": "2020-12-30 12:13:14.123",
   "price": 2.10,
   "buyer": {
      "first_name": "Elias",
      "last_name": "Williamson",
      "title": "Ms."
   }
}

The schema is derived from the table schema, so the above example record would correspond to the below table definition.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    buyer      ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'json'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

Debezium-JSON

Debezium is a Changelog Data Capture (CDC) tool that streams changes in real-time from MySQL, PostgreSQL, Oracle, Microsoft SQL Server, and many other databases into Apache Kafka®. It provides a unified format schema for changelog and supports to serialize messages using JSON.

Apache Flink® supports reading and writing Debezium INSERT/UPDATE/DELETE messages. The debezium-json format can be used to:

  • Synchronize changes of database tables to other systems
  • Read or write auditing logs
  • Maintain query results as real-time materialized views in databases
CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'debezium-json'
)

See the official Apache Flink® documentation for a full list of properties and metadata columns. The data type mappings are the same of those for JSON.

Canal-JSON

Canal is a Change Data Capture (CDC) tool that can stream changes from MySQL into other systems. It provides a unified format schema for changelog and supports serializing messages using JSON.

Apache Flink® supports reading and writing Canal INSERT/UPDATE/DELETE messages. The canal-json format can be used to:

  • Synchronize changes of database tables to other systems
  • Read or write auditing logs
  • Maintain query results as real-time materialized views in databases
CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'canal-json'
)

See the official Apache Flink® documentation for a full list of properties. The data type mappings are the same of those for JSON.

Apache ORC®

Apache ORC is a binary columnar storage format for structured data. Columnar formats can achieve significantly better scan performance for most analytical workloads due to lower IO costs and better compression.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'filesystem',
  'path' = 's3a://my-bucket/tables/my-table',
  'format' = 'orc'
)

See the official Apache Flink® documentation for a full list of configuration options and data type mappings.

Apache Parquet®

Apache Parquet is a binary columnar storage format for structured data. Columnar formats can achieve significantly better scan performance for most analytical workloads due to lower IO costs and better compression.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'filesystem',
  'path' = 's3a://my-bucket/tables/my-table',
  'format' = 'parquet'
)

See the official Apache Flink® documentation for a full list of configuration options and data type mappings.

Raw

The raw format allows to read and write a single column as raw bytes. Only simple types as listed in the data type mappings section of the Flink documentation are supported.

CREATE TABLE StringVals (
    value STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'string-vals',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'test',
  'format' = 'raw',
  'raw.charset' = 'UTF-8'
)

See the official Apache Flink® documentation for a fulll ist of configuration options.

Custom Connectors and Formats

If you need to read data from or write data to a system for which Ververica Platform does not provide a packaged connector or need to read or write data in a format that is not supported by Ververica Platform by default, you can register a custom connector or format.

Custom connectors and formats are namespaced resources. While packaged connectors and formats can be used in every namespace, custom connectors and formats can only be used in the namespace in which they were created.

A custom connector or format is defined by one or more JAR files that include all dependencies which are required to translate and execute a query that references the connector or format. See Developing a Custom Connector or Format for information on how to implement your own custom connector or format. The JAR files of a custom connector or format must be located in Universal Blob Storage or at HTTP locations which are accessible by Ververica Platform.

Every connector and format, regardless of packaged or custom, has a unique type that is defined by the connector’s or format’s implementation of the factory class (see org.apache.flink.table.factories.Factory). The type of a connector or format is referenced in the WITH clause of a CREATE TABLE statement. Moreover, the type of a connector or format determines the name of the corresponding connector or format resource. For example, a connector with type myconn would be used in a DDL statement as follows

CREATE TABLE Person (
  name STRING
) WITH (
  'connector' = 'myconn'
)

and the name of the connector resource in the namespace prod would be namespaces/prod/connectors/myconn.

Since connector and format types must be unique, it is not possible to add a custom connector or format if there exists another (packaged or custom) connector or format with the same type.

Note

In case a Ververica Platform upgrade adds a new packaged connector that has the same type as a previously existing custom connector, the custom connector will shadow the new packaged connector, such that the behavior of existing queries that use the custom connector is not changed.

You can create, update, and delete custom connectors or formats using the corresponding REST endpoints or via the “Connectors” section of the Ververica Platform UI. The UI supports uploading JAR files to Universal Blob Storage and extracting all available connectors and formats from the uploaded files (including all their exposed configuration options).

Developing a Custom Connector or Format

The Apache Flink® documentation describes in detail how to implement a custom source, sink, or format connector for Flink SQL.

Note

Ververica Platform only supports connectors based on DynamicTableSource and DynamicTableSink as described in documentation linked above. The legacy interfaces for table connectors are not supported.

Manage Packaged Connectors and Formats

You might want to add, remove, or modify a packaged connector or format. A packaged connector or format consists of:

  • A YAML metadata file that provides the platform with structured information about the connector (or format).
  • The JAR files that include all dependencies which are required to translate and execute a query. When a query is executed, the JAR files of all connectors and formats used by the query are uploaded to Universal Blob Storage and downloaded by the Flink cluster.

These files are located in the Ververica Platform Gateway Docker image in a dedicated directory for each connector (or format) under /vvp/sql/opt/connectors (or /vvp/sql/opt/formats). For example, the /vvp/sql/opt/connectors directory of the Ververica Platform Gateway Docker image looks as follows.

$ tree /vvp/sql/opt/connectors
/vvp/sql/opt/connectors/
├── blackhole
│   └── connector-meta.yaml
├── datagen
│   └── connector-meta.yaml
├── elasticsearch-6
│   ├── connector-meta.yaml
│   └── flink-sql-connector-elasticsearch6_2.12-1.11.2-stream2.jar
├── elasticsearch-7
│   ├── connector-meta.yaml
│   └── flink-sql-connector-elasticsearch7_2.12-1.11.2-stream2.jar
├── filesystem
│   ├── connector-meta.yaml
│   └── flink-connector-filesystem_2.12-1.11.2-stream2.jar
├── jdbc
│   ├── connector-meta.yaml
│   ├── flink-connector-jdbc_2.12-1.11.2-stream2.jar
│   ├── mariadb-java-client-2.6.2.jar
│   └── postgresql-42.2.16.jar
├── kafka
│   ├── connector-meta.yaml
│   ├── flink-connector-kafka_2.12-1.11.2-stream2.jar
│   ├── flink-connector-kafka-base_2.12-1.11.2-stream2.jar
│   └── kafka-clients-2.4.1.jar
└── print
    └── connector-meta.yaml

You can add a packaged connector or format by patching the Ververica Platform Gateway Docker image. In order to add a new connector or format you need to create a new directory under /vvp/sql/opt/connectors or /vvp/sql/opt/formats and add the metadata file and die JAR files with all required dependencies.

Since a connector is completely defined by its metadata file and JAR files, you can also disable or override a connector by removing the respective folder or replacing its content.

Note

Packaged connectors and formats are not managed per namespace. Therefore, any change of a packaged connector or format affects all namespaces.

In the following we describe the metadata files and show an example Dockerfile to patch the Gateway Docker image.

Connector Metadata File

The metadata file of a connector must be named connector-meta.yaml and has the following layout:

connector:
  type:       # the name of the connector type, such as "jdbc" for the JDBC connector. Must be the same as the folder name.
  packaged:   # a flag to indicate a packaged or custom connector.
  source:     # a flag to indicate that the connector can be used as a source connector.
  sink:       # a flag to indicate that the connector can be used as a sink connector.
  lookup:     # a flag to indicate that the connector can be used as a lookup source connector.
  properties: # a list of configuration options. This information is used to improve the user experience when creating tables in the UI.
    - key:           # the key of the option, such as 'connector' for the connector type.
      required:      # a flag to indicate that this is a required configuration option.
      defaultValue:  # the default value of the configuration option.
      definesFormat: # true if this property defines a format used by the connector
      description:   # a free text description of the configuration option.
    - ...
  supportedFormats: # a list of all formats, such as 'json', that are supported by the connector.
    - ...

You can have a look at the metadata files of the packaged connectors as an example for how they are specified.

Format Metadata File

The metadata file of a format must be named format-meta.yaml and has the following layout:

format:
  type:       # the name of the format type, such as "csv" for the CSV format. Must be the same as the folder name.
  packaged:   # a flag to indicate a packaged or custom format.
  source:     # a flag to indicate that the format can be used as a source format to read rows.
  sink:       # a flag to indicate that the format can be used as a sink format to write rows.
  properties: # a list of configuration options. This information is used to improve the ease the definition of formats in the UI.
    - key:          # the key of the option, such as 'format' for the format type.
      required:     # a flag to indicate that this is a required configuration option.
      defaultValue: # the default value of the configuration option.
      description:  # a free text description of the configuration option.

You can have a look at the metadata files of the packaged formats as an example for how they are specified.

Patching the Gateway Docker Image

The following example shows the Dockerfile to add a new connector called my-connector to the VVP Gateway Docker image.

FROM registry.ververica.com/v2.3/vvp-gateway:2.3.0
COPY connector-meta.yaml /vvp/sql/opt/connectors/my-connector/connector-meta.yaml
COPY my-connector-0.1.jar /vvp/sql/opt/connectors/my-connector/my-connector-0.1.jar

Customizing a Packaged Connector

Some connectors support customizing their behavior by adding the implementation of a public interface to their classpath and referencing this class in a configuration option. For example JDBC connector features the 'driver' property to specify the JDBC Driver class.

Customizing a connector or format works similar to adding a packaged connector or format. You need to patch the Ververica Platform Gateway Docker image and add a JAR file that contains your custom classes to the directory of the connector or format you would like to customize.