Docs Home
Viewing docs for
Self-ManagedNot available for BYOC

Connectors

On this page

Flink reads data from and writes data to external storage systems, for example, Apache Kafka® or a file system. Depending on the external system, the data can be encoded in different formats, such as Apache Avro® or JSON.
Flink uses connectors to communicate with the storage systems and to encode and decode table data in different formats.

Each table that is read or written with Flink SQL requires a connector specification. The connector of a table is specified and configured in the DDL statement that defines the table.

Ververica Platform comes with a selection of packaged connectors and formats.
Custom connectors and formats can be registered in the Platform to enable reading data from or writing data to systems that are not supported by the packaged connectors and formats.

This page discusses all packaged connectors and formats that come with Ververica Platform, explains how to register custom connectors and formats, and shows how to manage packaged connectors and formats.

SQL, Table API, and DataStream API

Flink supports three main programming abstractions (or "APIs"):

  1. SQL – A fully declarative language for querying streams and tables, similar to standard SQL syntax.
  2. Table API – A relational, but more programmatic API that uses a fluent DSL (Domain-Specific Language) in Java/Scala or Python. Internally, it operates on the same runtime concepts as SQL.
  3. DataStream API – A lower-level stream processing API in Java/Scala or Python that offers full control over event time, state, and custom transformations.

SQL

  • Fully declarative: you write standard SQL statements (e.g., CREATE TABLE, INSERT INTO, SELECT) that Flink translates into an execution plan.
  • Best for teams that want to leverage existing SQL skills to handle streaming and batch data.
  • Many built-in optimizations and easy integration with standard relational concepts.

Table API

  • Relational but fluent: uses methods like .select(...), .where(...), etc. in Java/Scala or Python code, rather than plain SQL strings.
  • Shares the same runtime/optimizer as SQL. Queries defined in the Table API can be converted to or from SQL easily.
  • Slightly more flexible than SQL because you can embed logic (e.g., custom UDFs) within the fluent DSL, though it remains mostly relational in style.

DataStream API

  • Imperative stream processing in Java/Scala or Python.
  • Exposes time and state primitives for low-level control, making it suitable for event-driven use cases and custom transformations.
  • Maximum flexibility (record-by-record processing, custom timers, etc.) but more verbose than the Table API or SQL.

Connector Types

Flink SQL knows four different types of connectors.

Bounded Source A bounded source connector reads table updates from a bounded data set. Once all updates are read and forwarded, the table backed by the connector becomes static and does not change anymore.

Unbounded Source An unbounded source connector reads table updates from an infinite data stream. Tables backed by an unbounded source connector are continuously updated until a query is terminated.

Lookup Source A lookup source connector is always used in combination with a lookup join. The connector performs a key-lookup in the data backed by the external system and returns the result.

Sink A sink connector writes table updates to an external system.

Packaged Connectors

NameUsageSupported Formats
Apache Kafka®Unbounded Source / SinkApache Avro®, Apache Avro® for Confluent® Schema Registry, CSV, JSON, Debezium-Avro® for Confluent® Schema Registry, Debezium-JSON, Canal-JSON, Raw
Upsert Apache Kafka®Unbounded Source / SinkApache Avro®, Apache Avro® for Confluent® Schema Registry, CSV, JSON, Raw
Amazon KinesisSink / SourceJSON, CSV, Apache Avro®, Raw
File SystemSink / SourceApache Avro®, CSV, JSON, Apache ORC®, Apache Parquet®, Debezium-JSON, Canal-JSON, Raw
JDBCBounded Source / Sink / LookupNot Applicable
Elasticsearch® 6SinkJSON
Elasticsearch® 7SinkJSON
Data GeneratorUnbounded SourceNot Applicable
PrintSinkNot Applicable
BlackholeSinkNot Applicable
SnowflakeSinkNot Applicable
BigQuerySink / SourceNot Applicable

Apache Kafka®

The Kafka connector allows for reading and writing data to and from Apache Kafka® topics.

SQL
1CREATE TABLE Orders (
2  order_id   BIGINT,
3  order_time TIMESTAMP(3),
4  price      DECIMAL(32, 2),
5  quantity   INT,
6  cost       AS price * quantity,
7  PRIMARY KEY (order_id) NOT ENFORCED
8) WITH (
9  'connector' = 'kafka',
10  'topic' = 'orders',
11  'properties.bootstrap.servers' = 'localhost:9092',
12  'properties.group.id' = 'orderGroup',
13  'format' = 'csv'
14)

See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns (https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/kafka/#available-metadata).

Upsert Kafka®

The Upsert Kafka connector allows for reading and writing data to and from compacted Apache Kafka® topics. A table backed by the upsert-kafka connector must define a PRIMARY KEY. The connector uses the table's primary key as key for the Kafka topic on which it performs upsert writes.

SQL
1CREATE TABLE CustomerOrderStats (
2  cust_id         BIGINT,
3  order_cnt       BIGINT,
4  order_price_sum DECIMAL(32,2),
5  last_order_time TIMESTAMP(3),
6  PRIMARY KEY (cust_id) NOT ENFORCED
7) WITH (
8  'connector' = 'upsert-kafka',
9  'topic' = 'customer-order-stats',
10  'properties.bootstrap.servers' = 'localhost:9092',
11  'properties.group.id' = 'customer-order-updater',
12  'key.format' = 'avro',
13  'value.format' = 'avro'
14)

See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns (https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/kafka/#available-metadata).

Kinesis

The Kinesis connector allows reading from and writing to Amazon Kinesis Data Streams (KDS).

SQL
1CREATE TABLE Orders (
2  order_id   BIGINT,
3  order_time TIMESTAMP(3),
4  price      DECIMAL(32, 2),
5  quantity   INT,
6  PRIMARY KEY (order_id) NOT ENFORCED
7)
8PARTITIONED BY (order_id)
9WITH (
10  'connector' = 'kinesis',
11  'stream' = 'orders',
12  'aws.region' = 'us-east-2',
13  'scan.stream.initpos' = 'LATEST',
14  'format' = 'avro'
15);

See the official Apache Flink® documentation for a full list of supported configuration options and metadata columns (https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/kinesis/#available-metadata).

File System

The file system sink connector writes table rows to files in a supported and configured file system, such as HDFS or S3. The connector supports row-encoded and bulk-encoded formats. Row-encoded formats such as CSV and JSON write each row individually to a file. Bulk-encoded formats collect a batch of rows in memory and organize them in a storage and scan efficient format before writing the data to a file. Examples for bulk-encoded formats are Apache Avro®v, Apache ORC®, and Apache Parquet®.

SQL
1CREATE TABLE Orders (
2  order_id   BIGINT,
3  order_time TIMESTAMP(3),
4  price      DECIMAL(32, 2),
5  quantity   INT,
6  PRIMARY KEY (order_id) NOT ENFORCED
7) WITH (
8  'connector' = 'filesystem',
9  'path' = 's3a://my-bucket/tables/my-table',
10  'format' = 'csv'
11)

The file system sink connector supports rolling files and partitioned tables. You can also configure commit policies to signal downstream systems when a partition has been completed. The section on Customizing a Packaged Connector (#customizing-a-packaged-connector) explains how to add a custom PartitionCommitPolicy to the connector.

See the official Apache Flink® documentation for a detailed description of these features and a full list of supported configuration options.

JDBC

The JDBC connector reads rows from or writes rows to a table in a JDBC database. The source connector operates in bounded mode. That means a table is read when a query is started and is never updated while the query is running. The sink connector continuously writes table updates to a table.

The database connection for each query is configured in the WITH clause of the CREATE TABLE statement for the query. The configuration is defined by simple 'key' = 'value' pairs.

For example, the following configuration specifies the JDBC connector, the MariaDB driver, and a MariaDB instance URL:

SQL
1CREATE TABLE Orders (
2    order_id   BIGINT,
3    order_time TIMESTAMP(3),
4    price      DECIMAL(32, 2),
5    quantity   INT,
6    cost       AS price * quantity,
7    PRIMARY KEY (order_id) NOT ENFORCED
8) WITH (
9  'connector' = 'jdbc',
10  'driver' = 'org.mariadb.jdbc.Driver',
11  'url' = 'jdbc:mysql://mysqlhost:3306/mydatabase',
12  'table-name' = 'orders'
13)

Out of the box using the JDBC connector, Ververica Platform supports MariaDB, MySQL 5.5.3+, PostgresSQL 8.2+, Apache Derby, Microsoft MSSQL, and Oracle. All the required drivers are included in the platform image, so zero additional configuration is required.

NameDriver String
MariaDBorg.mariadb.jdbc.Driver
MySQL 5.5.3+org.mariadb.jdbc.Driver
PostgresSQL 8.2+org.postgresql.Driver
Apache Derbyorg.apache.derby.jdbc.ClientDriver
MSSQLcom.microsoft.sqlserver.jdbc.SQLServerDriver
Oracleoracle.jdbc.OracleDriver

The JDBC connector also supports key lookups and can therefore be used for lookup joins to enrich tables with data stored in a JDBC table.

Was this helpful?