Connectors

Flink SQL reads data from and writes data to external storage systems, as for example Apache Kafka® or a file system. Depending on the external system, the data can be encoded in different formats, such as Apache Avro® or JSON. Flink uses connectors to communicate with the storage systems and to encode and decode table data in different formats.

Each table that is read or written with Flink SQL requires a connector specification. The connector of a table is specified and configured in the DDL statement :doc:`</user_guide/sql_development/table_view#create-table> that defines the table.

Ververica Platform comes with a selection of packaged connectors and formats, which are discussed on this page. We also describe how to add a custom connector or format.

Connector Types

Flink SQL knows four different types of connectors.

Bounded Source
A bounded source connector reads table updates from a bounded data set. Once all updates are read and forwarded, the table backed by the connector becomes static and does not change anymore.
Unbounded Source
An unbounded source connector reads table updates from an infinite data stream. Tables backed by an unbounded source connector are continuously updated until a query is terminated.
Lookup Source
A lookup source connector is always used in combination with a lookup join. The connector performs a key-lookup in the data backed by the external system and returns the result.
Sink
A sink connector writes table updates to an external system.

Supported Connectors

Name Usage Supported Formats
Apache Kafka® Unbounded Source / Sink
File System Sink Apache Avro®, CSV, JSON, Apache ORC®, Apache Parquet®
Apache Hive® Bounded & Unbounded Source / Sink / Lookup Text, SequenceFile, CSV, Apache ORC®, Apache Parquet®
JDBC Bounded Source / Sink / Lookup Not Applicable
Elasticsearch® 6 Sink JSON
Elasticsearch® 7 Sink JSON
Data Generator Unbounded Source Not Applicable
Print Sink Not Applicable
Blackhole Sink Not Applicable

Apache Kafka®

The Kafka connector allows for reading and writing data to and from Apache Kafka® topics.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

See the official Apache Flink® documentation for a full list of supported configuration options.

File System

The file system sink connector writes table rows to files in a supported and configured file system, such as HDFS or S3. The connector supports row-encoded and bulk-encoded formats. Row-encoded formats such as CSV and JSON write each row individually to a file. Bulk-encoded formats collect a batch of rows in memory and organize them in a storage and scan efficient format before writing the data to a file. Examples for bulk-encoded formats are Apache Avro®, Apache ORC®, and Apache Parquet®.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'filesystem',
  'path' = 's3a://my-bucket/tables/my-table',
  'format' = 'csv'
)

The file system sink connector supports rolling files and partitioned tables. You can also configure commit policies to signal downstream systems when a partition has been completed. The section on Customizing Packaged Connectors explains how to add a custom PartitionCommitPolicy to the connector.

See the official Apache Flink® documentation for a detailed description of these features and a full list of supported configuration options.

Note

Access credentials for the file system can be passed via the Flink configuration of the deployment.

Note

When reading or writing tables on HDFS, the Hadoop dependencies need to be linked to the classpath of the Session Cluster or Deployment via

spec:
  kubernetes:
    pods:
       envVars:
         - name: VVP_EXTRA_CLASSPATH
           value: /flink/opt/flink-shaded-hadoop/*

Apache Hive®

The Hive connector is used to access all tables that are provided by the Apache Hive® Catalog. It supports bound and unbounded reading from and writing to tables stored in Hive. Please see the Apache Hive® Catalog documentation for how to connect Ververica Platform with Hive Metastore.

Ververica Platform supports Amazon S3 (via Hadoop’s S3 filesystem implementation s3a) as well as Apache Hadoop® HDFS as Hive storage layer out-of-the-box. If you are using a different storage layer, please reach out.

Note

When reading a non-partitioned table backed by HDFS, the Hadoop dependencies need to be linked to the classpath of the Session Cluster or Deployment via

spec:
  kubernetes:
    pods:
       envVars:
         - name: VVP_EXTRA_CLASSPATH
           value: /flink/opt/flink-shaded-hadoop/*

Since the Hive Catalog is read-only in Ververica Platform, additional Flink-specific options need to be added via dynamic table options.

The source connector can operate in bounded as well as unbounded mode. It supports partition pruning, projection and limit pushdown.

-- unbounded
INSERT INTO ..
SELECT ...
FROM `my-hive-catalog`.`default`.`my-hive-table`
/*+ OPTIONS('streaming-source.enable'='true') */

-- bounded
INSERT INTO ..
SELECT ...
FROM `my-hive-catalog`.`default`.`my-hive-table`

See the official Apache Flink® documentation for a list of all options for streaming from Hive.

The sink connector is configured via dynamic table options, too. Its implementation is based on the File System Connector.

INSERT INTO `my-hive-catalog`.`default`.`my-hive-table`
/*+ OPTIONS(
    'sink.partition-commit.policy.kind'          = 'metastore,success-file',
    'sink.partition-commit.trigger'              = 'partition-time',
    'partition.time-extractor.timestamp-pattern' = '$dt $hr:00:00',
    'sink.partition-commit.delay'                = '5 min') */
SELECT ...
FROM ...

See the official Apache Flink® documentation for a list of all options for streaming to Hive.

Please see the Flink documentation for a mapping between Apache Hive® and Apache Flink® data types.

The connector also supports key lookups and can therefore be used for lookup joins to enrich tables with data stored in a Hive table. See the official {flink documentation} for a list of all options for Hive tables as temporal tables.

JDBC

The JDBC connector reads rows from or writes rows to a table in a JDBC database. The source connector operates in bounded mode. That means a table is read when a query is started and is never updated while the query is running. The sink connector continuously writes table updates to a table. The connector also supports key lookups and can therefore be used for lookup joins to enrich tables with data stored in a JDBC table.

Note

If you want to use the JDBC connector as a lookup table source, you should ensure that the lookup queries against the table can be efficiently processed, for example by creating an appropriate index.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'driver' = 'org.mariadb.jdbc.Driver',
  'url' = 'jdbc:mysql://mysqlhost:3306/mydatabase',
  'table-name' = 'orders'
)

See the official Apache Flink® documentation for a detailed description of the JDBC connector’s features and a full list of supported configuration options.

Note

Ververica Platform supports MariaDB, MySQL 5.5.3+, and PostgresSQL 8.2+ out of the box. You can add a JDBC driver for any other DBMS by adding a JAR file with a matching JDBC driver to the connector as described by Customizing Packaged Connectors.

Elasticsearch® 6 and 7+

Ververica Platform includes two sink connectors to write tables to Elasticsearch® indexes. Because Elasticsearch 7 introduced a new client API, we provide the connectors elasticsearch-6 and elasticsearch-7 which support Elasticsearch 6 and Elasticsearch 7 including subsequent versions.

-- create Orders table backed by Elasticsearch 6
CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-6',
    'hosts' = 'http://es_host:9092',
    'index' = 'orders',
    'document-type' = 'order'
)
-- create Orders table backed by Elasticsearch 7+
CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://es_host:9092',
    'index' = 'orders'
)

See the official Apache Flink® documentation for a detailed description of the connector’s features and a full list of supported configuration options.

Data Generator

The data generator connector allows developers to create a table backed by an in-memory data-generator. It allows for mocking out data sources during the early development of a query. When used in conjunction with computed columns, this source allows for a flexible generation of complex types

CREATE TABLE Orders (
    order_id BIGINT,
    price        DECIMAL(32,2),
    buyer        ROW<first_name STRING, last_name STRING>,
    order_time   TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'datagen'
)

Often, data generators are defined based on physical tables using a LIKE clause.

CREATE TABLE Orders (
    order_id BIGINT,
    price        DECIMAL(32,2),
    buyer        ROW<first_name STRING, last_name STRING>,
    order_time   TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

CREATE TABLE MockOrders WITH (
    'connector' = 'datagen'
) LIKE Orders (EXCLUDING ALL)

See the official Apache Flink® documentation for a full list of supported configurations.

Note

Please note that we are already shipping a version of the datagen connector in Ververica Platform since 2.3.0 that includes FLINK-18735, which is why we are linking to the snapshot documentation of Apache Flink® above instead of the one of Apache Flink® 1.11. With FLINK-18735 the datagen connector supports basically all primitive datatypes as well as composite types like ROW and ARRAY.

BlackHole

The blackhole connector allows developers to create a table that swallows all input records. Consider it the /dev/null of Flink SQL.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2)
) WITH (
    'connector' = 'blackhole'
)

Often, blackhole tables are defined based on physical tables using a LIKE clause.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

CREATE TABLE BlackHoleOrders
WITH (
    'connector' = 'blackhole'
) LIKE Orders (EXCLUDING ALL)

Supported Formats

Apache Avro®

The Avro format allows reading and writing Apache Avro® data based on an Avro schema.

{
   "type": "record",
   "fields" : [
      {"name": "order_id", "type": "long"},
      {"name": "order_time", "type": "long", "logicalType": "timestamp-millis"},
      {"name": "price", "type": "bytes", "logicalType": "decimal", "precision": 32, "scale": 2},
      {"name": "buyer", "type": {
         "type": "record",
         "fields": [
            {"name": "first_name", "type": "string"},
            {"name": "last_name", "type": "string"},
            {"name": "title", "type": "string"}
         ]
      }}
  ]
}

The schema is derived from the table schema, so the above example schema would correspond to the below table definition.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    buyer      ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'avro'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

Apache Avro® (Confluent®)

The Avro format allows reading and writing Apache Avro records in line with io.confluent.kafka.serializers.(De)Serializer.

When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured Confluent Schema Registry based on the schema version id encoded in the record while the reader schema is inferred from table schema.

When writing (serializing) a record with this format the Avro schema is inferred from the table schema and registered in the configured Confluent Schema Registry under the subject given in avro-confluent.schema-registry.subject.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    buyer      ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'avro-confluent',
    'avro-confluent.schema-registry.url' = 'http://localhost:8081'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

Note

Please note that this format has not been released in Apache Flink® yet. It will be released in Apache Flink® 1.12.0, which is why we are linking to the snapshot documentation of Apache Flink® above.

CSV

The CSV format allows reading and writing CSV data based on a CSV schema.

10,"2020-12-30 12:13:14.123",2.10,10

The schema is derived from the table schema, so the above example record would correspond to the below table definition.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'csv'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

JSON

The JSON format allows reading and writing JSON data based on a JSON schema.

{
   "order_id": 10,
   "order_time": "2020-12-30 12:13:14.123",
   "price": 2.10,
   "buyer": {
      "first_name": "Elias",
      "last_name": "Williamson",
      "title": "Ms."
   }
}

The schema is derived from the table schema, so the above example record would correspond to the below table definition.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    buyer      ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'json'
)

See the official Apache Flink® documentation for a full list of properties and data type mappings.

Debezium-JSON

Debezium is a Changelog Data Capture (CDC) tool that streams changes in real-time from MySQL, PostgreSQL, Oracle, Microsoft SQL Server, and many other databases into Apache Kafka®. It provides a unified format schema for changelog and supports to serialize messages using JSON.

Apache Flink® supports interpreting Debezium INSERT/UPDATE/DELETE messages. It is useful in many cases to leverage this feature, such as:

  • Synchronizing incremental data from databases to other systems
  • Auditing logs
  • Real-time materialized views on databases
CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'debezium-json'
)

See the official Apache Flink® documentation for a full list of properties. The data type mappings are the same of those for JSON.

Canal-JSON

Canal is a Change Data Capture (CDC) tool that can stream changes from MySQL into other systems. It provides a unified format schema for changelog and supports serializing messages using JSON.

Apache Flink® supports interpreting Debezium INSERT/UPDATE/DELETE messages. It is useful in many cases to leverage this feature, such as:

  • Synchronizing incremental data from databases to other systems
  • Auditing logs
  • Real-time materialized views on databases
CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    cost       AS price * quantity
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'orderGroup',
    'format' = 'canal-json'
)

See the official Apache Flink® documentation for a full list of properties. The data type mappings are the same of those for JSON.

Apache ORC®

Apache ORC is a binary columnar storage format for structured data. Columnar formats can achieve significantly better scan performance for most analytical workloads due to lower IO costs and better compression.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'filesystem',
  'path' = 's3a://my-bucket/tables/my-table',
  'format' = 'orc'
)

See the official Apache Flink® documentation for a full list of configuration options and data type mappings.

Apache Parquet®

Apache Parquet is a binary columnar storage format for structured data. Columnar formats can achieve significantly better scan performance for most analytical workloads due to lower IO costs and better compression.

CREATE TABLE Orders (
    order_id   BIGINT,
    order_time TIMESTAMP(3),
    price      DECIMAL(32, 2),
    quantity   INT,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'filesystem',
  'path' = 's3a://my-bucket/tables/my-table',
  'format' = 'parquet'
)

See the official Apache Flink® documentation for a full list of configuration options and data type mappings.

Custom Connectors and Formats

Ververica Platform supports several storage systems and formats. However, it is also possible to add a custom connector to the platform.

A connector or format consists of:

  • A YAML metadata file that provides the platform with structured information about the connector (or format).
  • The JAR files that include all dependencies which are required to translate and execute a query. When a query is executed, the JAR files of all connectors and formats used by the query are uploaded to Universal Blob Storage and downloaded by the Flink cluster.

These files are located in the Ververica Platform Gateway Docker image in a dedicated directory for each connector (or format) under /vvp/sql/opt/connectors (or /vvp/sql/opt/formats). For example, the /vvp/sql/opt/connectors directory of the Ververica Platform Gateway Docker image looks as follows.

$ tree /vvp/sql/opt/connectors
/vvp/sql/opt/connectors/
├── blackhole
│   └── connector-meta.yaml
├── datagen
│   └── connector-meta.yaml
├── elasticsearch-6
│   ├── connector-meta.yaml
│   └── flink-sql-connector-elasticsearch6_2.12-1.11.2-stream2.jar
├── elasticsearch-7
│   ├── connector-meta.yaml
│   └── flink-sql-connector-elasticsearch7_2.12-1.11.2-stream2.jar
├── filesystem
│   ├── connector-meta.yaml
│   └── flink-connector-filesystem_2.12-1.11.2-stream2.jar
├── jdbc
│   ├── connector-meta.yaml
│   ├── flink-connector-jdbc_2.12-1.11.2-stream2.jar
│   ├── mariadb-java-client-2.6.2.jar
│   └── postgresql-42.2.16.jar
├── kafka
│   ├── connector-meta.yaml
│   ├── flink-connector-kafka_2.12-1.11.2-stream2.jar
│   ├── flink-connector-kafka-base_2.12-1.11.2-stream2.jar
│   └── kafka-clients-2.4.1.jar
└── print
    └── connector-meta.yaml

You can add a custom connector or format by patching the Ververica Platform Gateway Docker image. To add a new connector or format you need to create a new directory under /vvp/sql/opt/connectors or /vvp/sql/opt/formats and add the metadata file and die JAR files with all required dependencies.

Since a connector is completely defined by its metadata file and JAR files, you can also disable or override a connector by removing the respective folder or replacing its content.

In the following we describe the metadata files, point to the relevant Apache Flink® documentation about implementing custom SQL connectors, and show an example Dockerfile to patch the Gateway Docker image.

Connector Metadata File

The metadata file of a connector must be named connector-meta.yaml and has the following layout:

connector:
  type:       # the name of the connector type, such as "jdbc" for the JDBC connector. Must be the same as the folder name.
  packaged:   # a flag to indicate a packaged or custom connector.
  source:     # a flag to indicate that the connector can be used as a source connector.
  sink:       # a flag to indicate that the connector can be used as a sink connector.
  lookup:     # a flag to indicate that the connector can be used as a lookup source connector.
  properties: # a list of configuration options. This information is used to improve the user experience when creating tables in the UI.
    - key:          # the key of the option, such as 'connector' for the connector type.
      required:     # a flag to indicate that this is a required configuration option.
      defaultValue: # the default value of the configuration option.
      description:  # a free text description of the configuration option.
    - ...
  requiresFormat:   # a flag to indicate whether the connector requires a format or not.
  supportedFormats: # a list of all formats, such as 'json', that are supported by the connector.
    - ...

You can have a look at the metadata files of the packaged connectors as an example for how they are specified.

Format Metadata File

The metadata file of a format must be named format-meta.yaml and has the following layout:

format:
  type:       # the name of the format type, such as "csv" for the CSV format. Must be the same as the folder name.
  packaged:   # a flag to indicate a packaged or custom format.
  source:     # a flag to indicate that the format can be used as a source format to read rows.
  sink:       # a flag to indicate that the format can be used as a sink format to write rows.
  properties: # a list of configuration options. This information is used to improve the ease the definition of formats in the UI.
    - key:          # the key of the option, such as 'format' for the format type.
      required:     # a flag to indicate that this is a required configuration option.
      defaultValue: # the default value of the configuration option.
      description:  # a free text description of the configuration option.

You can have a look at the metadata files of the packaged formats as an example for how they are specified.

Developing a Custom Connector or Format

The Apache Flink® documentation describes in detail how to implement a custom source, sink, or format connector for Flink SQL.

Note

Ververica Platform only supports connectors based on DynamicTableSource and DynamicTableSink as described in documentation linked above. The legacy interfaces for table connectors are not supported.

Patching the Gateway Docker Image

The following example shows the Dockerfile to add a new connector called my-connector to the VVP Gateway Docker image.

FROM registry.ververica.com/v2.3/vvp-gateway:2.3.0
COPY connector-meta.yaml /vvp/sql/opt/connectors/my-connector/connector-meta.yaml
COPY my-connector-0.1.jar /vvp/sql/opt/connectors/my-connector/my-connector-0.1.jar

Customizing Packaged Connectors

Some connectors support customizing their behavior by adding the implementation of a public interface to their classpath and referencing this class in a configuration option. For example JDBC connector features the 'driver' property to specify the JDBC Driver class.

Customizing a connector or format works similar to adding a custom connector or format. You need to patch the Ververica Platform Gateway Docker image and add a JAR file that contains your custom classes to the directory of the connector or format you would like to customize.