Connectors
Flink SQL reads data from and writes data to external storage systems, as for example Apache Kafka® or a file system. Depending on the external system, the data can be encoded in different formats, such as Apache Avro® or JSON. Flink uses connectors to communicate with the storage systems and to encode and decode table data in different formats.
Each table that is read or written with Flink SQL requires a connector specification. The connector of a table is specified and configured in the DDL statement that defines the table.
Ververica Platform comes with a selection of packaged connectors and formats. Custom connectors and formats can be registered in the Platform to enable reading data from or writing data to systems which are not supported by packaged connectors and format.
This page discusses all packaged connectors and formats that come with Ververica Platform, explains how to register custom connectors and formats, and how to manage packaged connectors and formats.
Connector Types
Flink SQL knows four different types of connectors.
Bounded Source A bounded source connector reads table updates from a bounded data set. Once all updates are read and forwarded, the table backed by the connector becomes static and does not change anymore.
Unbounded Source An unbounded source connector reads table updates from an infinite data stream. Tables backed by an unbounded source connector are continuously updated until a query is terminated.
Lookup Source A lookup source connector is always used in combination with a lookup join. The connector performs a key-lookup in the data backed by the external system and returns the result.
Sink A sink connector writes table updates to an external system.
Packaged Connectors
Name | Usage | Supported Formats |
---|---|---|
Apache Kafka® | Unbounded Source / Sink | Apache Avro®, Apache Avro® for Confluent® Schema Registry, CSV, JSON, Debezium-Avro® for Confluent® Schema Registry, Debezium-JSON, Canal-JSON, Raw |
Upsert Apache Kafka® | Unbounded Source / Sink | Apache Avro®, Apache Avro® for Confluent® Schema Registry, CSV, JSON, Raw |
Amazon Kinesis | Sink / Source | JSON, CSV, Apache Avro®, Raw |
File System | Sink / Source | Apache Avro®, CSV, JSON, Apache ORC®, Apache Parquet®, Debezium-JSON, Canal-JSON, Raw |
JDBC | Bounded Source / Sink / Lookup | Not Applicable |
Elasticsearch® 6 | Sink | JSON |
Elasticsearch® 7 | Sink | JSON |
Data Generator | Unbounded Source | Not Applicable |
Sink | Not Applicable | |
Blackhole | Sink | Not Applicable |
Apache Kafka®
The Kafka connector allows for reading and writing data to and from Apache Kafka® topics.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)
See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns.
Upsert Kafka®
The Upsert Kafka connector allows for reading and writing data to and from compacted Apache Kafka® topics.
A table backed by the upsert-kafka
connector must define a PRIMARY KEY
.
The connector uses the table's primary key as key for the Kafka topic on which it performs upsert writes.
CREATE TABLE CustomerOrderStats (
cust_id BIGINT,
order_cnt BIGINT,
order_price_sum DECIMAL(32,2),
last_order_time TIMESTAMP(3),
PRIMARY KEY (cust_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'customer-order-stats',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'customer-order-updater',
'key.format' = 'avro',
'value.format' = 'avro'
)
See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns.
Kinesis
The Kinesis connector allows reading from and writing to Amazon Kinesis Data Streams (KDS).
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
)
PARTITIONED BY (order_id)
WITH (
'connector' = 'kinesis',
'stream' = 'orders',
'aws.region' = 'us-east-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'avro'
);
See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns.
File System
The file system sink connector writes table rows to files in a supported and configured file system, such as HDFS or S3. The connector supports row-encoded and bulk-encoded formats. Row-encoded formats such as CSV and JSON write each row individually to a file. Bulk-encoded formats collect a batch of rows in memory and organize them in a storage and scan efficient format before writing the data to a file. Examples for bulk-encoded formats are Apache Avro®v, Apache ORC®, and Apache Parquet®.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'filesystem',
'path' = 's3a://my-bucket/tables/my-table',
'format' = 'csv'
)
The file system sink connector supports rolling files and partitioned
tables. You can also configure commit policies to signal downstream
systems when a partition has been completed. The section on
Customizing a Packaged Connector explains how to add a custom PartitionCommitPolicy
to the connector.
See the official Apache Flink® documentation for a detailed description of these features and a full list of supported configuration options.
Access credentials for the file system can be passed via the Flink configuration of the deployment.
When reading or writing tables on HDFS, the Apache Hadoop® dependencies need to be linked to the classpath of the Session Cluster or Deployment via
spec:
kubernetes:
pods:
envVars:
- name: VVP_EXTRA_CLASSPATH
value: /flink/opt/flink-shaded-hadoop/*
JDBC
The JDBC connector reads rows from or writes rows to a table in a JDBC database. The source connector operates in bounded mode. That means a table is read when a query is started and is never updated while the query is running. The sink connector continuously writes table updates to a table.
The database connection for each query is configured in the WITH
clause of the CREATE TABLE
statement for the query. The
configuration is defined by simple 'key' = 'value'
pairs.
For example, the following configuration specifies the JDBC connector, the MariaDB driver, and a MariaDB instance URL:
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'driver' = 'org.mariadb.jdbc.Driver',
'url' = 'jdbc:mysql://mysqlhost:3306/mydatabase',
'table-name' = 'orders'
)
Out of the box using the JDBC connector, Ververica Platform supports MariaDB, MySQL 5.5.3+, PostgresSQL 8.2+, Apache Derby, Microsoft MSSQL, and Oracle. All the required drivers are included in the platform image, so zero additional configuration is required.
Name | Driver String |
---|---|
MariaDB | org.mariadb.jdbc.Driver |
MySQL 5.5.3+ | org.mariadb.jdbc.Driver |
PostgresSQL 8.2+ | org.postgresql.Driver |
Apache Derby | org.apache.derby.jdbc.ClientDriver |
MSSQL | com.microsoft.sqlserver.jdbc.SQLServerDriver |
Oracle | oracle.jdbc.OracleDriver |
The JDBC connector also supports key lookups and can therefore be used for lookup joins to enrich tables with data stored in a JDBC table.
To use the JDBC connector as a lookup table source, you must ensure
that the lookup queries against the table can be efficiently
processed, for example by creating an appropriate index. Add an
'index' = '<index-name>'
pair to the database configuration in the
WITH
clause, for example:
CREATE TABLE Orders (
...
) WITH (
'connector' = 'jdbc',
...
'index' = 'orders'
)
For a detailed description of the JDBC Connector, see the official Apache Flink® documentation Connector features and options pages.
You can add a JDBC driver for any other DBMS by adding a JAR file with a matching JDBC driver to the connector as described in Customizing a Packaged Connector.
Elasticsearch® 6 and 7+
Ververica Platform includes two sink connectors to write tables to Elasticsearch® indexes.
Because Elasticsearch 7 introduced a new client API, we provide the connectors elasticsearch-6
and elasticsearch-7
which support Elasticsearch 6 and Elasticsearch 7 including subsequent versions.
-- create Orders table backed by Elasticsearch 6
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://es_host:9092',
'index' = 'orders',
'document-type' = 'order'
)
-- create Orders table backed by Elasticsearch 7+
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://es_host:9092',
'index' = 'orders'
)
See the official Apache Flink® documentation for a detailed description of the connector's features and a full list of supported configuration options.
Data Generator
The data generator connector allows developers to create a table backed by an in-memory data-generator. It allows for mocking out data sources during the early development of a query. When used in conjunction with computed columns, this source allows for a flexible generation of complex types.
CREATE TABLE Orders (
order_id BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen'
)
Often, data generators are defined based on physical tables using a LIKE
clause.
CREATE TABLE Orders (
order_id BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)
CREATE TABLE MockOrders WITH (
'connector' = 'datagen'
) LIKE Orders (EXCLUDING ALL)
See the official Apache Flink® documentation for a full list of supported configurations.
Please note that we are already shipping a version of the datagen
connector in Ververica Platform since 2.3.0 that includes FLINK-18735, which is why we are linking to the snapshot documentation of Apache Flink® above instead of the one of Apache Flink® 1.11.
With FLINK-18735 the datagen
connector supports basically all primitive datatypes as well as composite types like ROW
and ARRAY
.
Print Sink
The print connector allows developers to create a table that will print all rows to standard out of the executing TaskManager(s).
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2)
) WITH (
'connector' = 'print'
)
Often, print tables are defined based on physical tables using a LIKE
clause.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)
CREATE TABLE PrintOrders
WITH (
'connector' = 'print'
) LIKE Orders (EXCLUDING ALL)
See the official Apache Flink® documentation for a full list of supported configurations.
BlackHole
The blackhole connector allows developers to create a table that swallows all input records.
Consider it the /dev/null
of Flink SQL.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2)
) WITH (
'connector' = 'blackhole'
)
Often, blackhole tables are defined based on physical tables using a LIKE
clause.
CREATE TABLE Orders (
order_id BIGINT,
order_time TIMESTAMP(3),
price DECIMAL(32, 2),
quantity INT,
cost AS price * quantity,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'orderGroup',
'format' = 'csv'
)
CREATE TABLE BlackHoleOrders
WITH (
'connector' = 'blackhole'
) LIKE Orders (EXCLUDING ALL)