Connectors
Flink SQL reads data from and writes data to external storage systems, as for example Apache Kafka® or a file system. Depending on the external system, the data can be encoded in different formats, such as Apache Avro® or JSON. Flink uses connectors to communicate with the storage systems and to encode and decode table data in different formats.
Each table that is read or written with Flink SQL requires a connector specification. The connector of a table is specified and configured in the DDL statement that defines the table.
Ververica Platform comes with a selection of packaged connectors and formats. Custom connectors and formats can be registered in the Platform to enable reading data from or writing data to systems which are not supported by packaged connectors and format.
This page discusses all packaged connectors and formats that come with Ververica Platform, explains how to register custom connectors and formats, and how to manage packaged connectors and formats.
Connector Types
Flink SQL knows four different types of connectors.
Bounded Source A bounded source connector reads table updates from a bounded data set. Once all updates are read and forwarded, the table backed by the connector becomes static and does not change anymore.
Unbounded Source An unbounded source connector reads table updates from an infinite data stream. Tables backed by an unbounded source connector are continuously updated until a query is terminated.
Lookup Source A lookup source connector is always used in combination with a lookup join. The connector performs a key-lookup in the data backed by the external system and returns the result.
Sink A sink connector writes table updates to an external system.
Packaged Connectors
| Name | Usage | Supported Formats |
|---|---|---|
| Apache Kafka® | Unbounded Source / Sink | Apache Avro®, Apache Avro® for Confluent® Schema Registry, CSV, JSON, Debezium-Avro® for Confluent® Schema Registry, Debezium-JSON, Canal-JSON, Raw |
| Upsert Apache Kafka® | Unbounded Source / Sink | Apache Avro®, Apache Avro® for Confluent® Schema Registry, CSV, JSON, Raw |
| Amazon Kinesis | Sink / Source | JSON, CSV, Apache Avro®, Raw |
| File System | Sink / Source | Apache Avro®, CSV, JSON, Apache ORC®, Apache Parquet®, Debezium-JSON, Canal-JSON, Raw |
| JDBC | Bounded Source / Sink / Lookup | Not Applicable |
| Elasticsearch® 6 | Sink | JSON |
| Elasticsearch® 7 | Sink | JSON |
| Data Generator | Unbounded Source | Not Applicable |
| Sink | Not Applicable | |
| Blackhole | Sink | Not Applicable |
Apache Kafka®
The Kafka connector allows for reading and writing data to and from Apache Kafka® topics.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)
See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns.
Upsert Kafka®
The Upsert Kafka connector allows for reading and writing data to and from compacted Apache Kafka® topics.
A table backed by the upsert-kafka connector must define a PRIMARY KEY.
The connector uses the table's primary key as key for the Kafka topic on which it performs upsert writes.
CREATE TABLE CustomerOrderStats (
cust_id BIGINT,
order_cnt BIGINT,
order_price_sum DECIMAL(32,2),
last_order_time TIMESTAMP(3),
PRIMARY KEY (cust_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'customer-order-stats',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'customer-order-updater',
'key.format' = 'avro',
'value.format' = 'avro'
)
See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns.
Kinesis
The Kinesis connector allows reading from and writing to Amazon Kinesis Data Streams (KDS).
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
)
PARTITIONED BY (order_id)
WITH (
'connector' = 'kinesis',
'stream' = 'orders',
'aws.region' = 'us-east-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'avro'
);
See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns.
File System
The file system sink connector writes table rows to files in a supported and configured file system, such as HDFS or S3. The connector supports row-encoded and bulk-encoded formats. Row-encoded formats such as CSV and JSON write each row individually to a file. Bulk-encoded formats collect a batch of rows in memory and organize them in a storage and scan efficient format before writing the data to a file. Examples for bulk-encoded formats are Apache Avro®v, Apache ORC®, and Apache Parquet®.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'filesystem',
'path' = 's3a://my-bucket/tables/my-table',
'format' = 'csv'
)
The file system sink connector supports rolling files and partitioned tables. You can also configure commit policies to signal downstream systems when a partition has been completed. The section on Customizing Packaged Connectors explains how to add a custom PartitionCommitPolicy to the connector.
See the official Apache Flink® documentation for a detailed description of these features and a full list of supported configuration options.
Access credentials for the file system can be passed via the Flink configuration of the deployment.
When reading or writing tables on HDFS, the Apache Hadoop® dependencies need to be linked to the classpath of the Session Cluster or Deployment via
spec:
kubernetes:
pods:
envVars:
- name: VVP_EXTRA_CLASSPATH
value: /flink/opt/flink-shaded-hadoop/*
JDBC
The JDBC connector reads rows from or writes rows to a table in a JDBC database. The source connector operates in bounded mode. That means a table is read when a query is started and is never updated while the query is running. The sink connector continuously writes table updates to a table. The connector also supports key lookups and can therefore be used for lookup joins to enrich tables with data stored in a JDBC table.
If you want to use the JDBC connector as a lookup table source, you should ensure that the lookup queries against the table can be efficiently processed, for example by creating an appropriate index.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'driver' = 'org.mariadb.jdbc.Driver',
'url' = 'jdbc:mysql://mysqlhost:3306/mydatabase',
'table-name' = 'orders'
)
See the official Apache Flink® documentation for a detailed description of the JDBC connector's features and a full list of supported configuration options.
Ververica Platform supports MariaDB, MySQL 5.5.3+, and PostgresSQL 8.2+ out of the box. You can add a JDBC driver for any other DBMS by adding a JAR file with a matching JDBC driver to the connector as described by Customizing Packaged Connectors.
Elasticsearch® 6 and 7+
Ververica Platform includes two sink connectors to write tables to Elasticsearch® indexes.
Because Elasticsearch 7 introduced a new client API, we provide the connectors elasticsearch-6 and elasticsearch-7 which support Elasticsearch 6 and Elasticsearch 7 including subsequent versions.
-- create Orders table backed by Elasticsearch 6
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://es_host:9092',
'index' = 'orders',
'document-type' = 'order'
)
-- create Orders table backed by Elasticsearch 7+
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://es_host:9092',
'index' = 'orders'
)
See the official Apache Flink® documentation for a detailed description of the connector's features and a full list of supported configuration options.
Data Generator
The data generator connector allows developers to create a table backed by an in-memory data-generator. It allows for mocking out data sources during the early development of a query. When used in conjunction with computed columns, this source allows for a flexible generation of complex types
CREATE TABLE Orders (
order_id BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen'
)
Often, data generators are defined based on physical tables using a LIKE clause.
CREATE TABLE Orders (
order_id BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)
CREATE TABLE MockOrders WITH (
'connector' = 'datagen'
) LIKE Orders (EXCLUDING ALL)
See the official Apache Flink® documentation for a full list of supported configurations.
Please note that we are already shipping a version of the datagen connector in Ververica Platform since 2.3.0 that includes FLINK-18735, which is why we are linking to the snapshot documentation of Apache Flink® above instead of the one of Apache Flink® 1.11.
With FLINK-18735 the datagen connector supports basically all primitive datatypes as well as composite types like ROW and ARRAY.
Print Sink
The print connector allows developers to create a table that will print all rows to standard out of the executing TaskManager(s).
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2)
) WITH (
'connector' = 'print'
)
Often, print tables are defined based on physical tables using a LIKE clause.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)
CREATE TABLE PrintOrders
WITH (
'connector' = 'print'
) LIKE Orders (EXCLUDING ALL)
See the official Apache Flink® documentation for a full list of supported configurations.
BlackHole
The blackhole connector allows developers to create a table that swallows all input records.
Consider it the /dev/null of Flink SQL.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2)
) WITH (
'connector' = 'blackhole'
)
Often, blackhole tables are defined based on physical tables using a LIKE clause.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)
CREATE TABLE BlackHoleOrders
WITH (
'connector' = 'blackhole'
) LIKE Orders (EXCLUDING ALL)
Packaged Formats
Apache Avro®
The Avro format allows reading and writing Apache Avro® data based on an Avro schema.
{
"type": "record",
"fields" : [
{"name": "order_id", "type": "long"},
{"name": "order_time", "type": "long", "logicalType": "timestamp-millis"},
{"name": "price", "type": "bytes", "logicalType": "decimal", "precision": 32, "scale": 2},
{"name": "buyer", "type": {
"type": "record",
"fields": [
{"name": "first_name", "type": "string"},
{"name": "last_name", "type": "string"},
{"name": "title", "type": "string"}
]
}}
]
}
The schema is derived from the table schema, so the above example schema would correspond to the below table definition.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
buyer ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'avro'
)
See the official Apache Flink® documentation for a full list of properties and data type mappings.
Apache Avro® (Confluent®)
The Avro format allows reading and writing Apache Avro records in line with io.confluent.kafka.serializers.(De)Serializer.
When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured Confluent Schema Registry based on the schema version id encoded in the record while the reader schema is inferred from the table schema.
When writing (serializing) a record with this format the Avro schema is inferred from the table schema and registered in the configured Confluent Schema Registry under the subject given in avro-confluent.schema-registry.subject.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
buyer ROW<first_name STRING, last_name STRING, title STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://localhost:8081'
)
See the official Apache Flink® documentation for a full list of properties and data type mappings.