Azure CosmosDB
The Azure CosmosDB connector enables writing data from Flink to Azure Cosmos DB using the API for NoSQL. It is a sink-only connector with at-least-once delivery semantics, built on the Flink AsyncSink framework and the Azure Cosmos DB Java SDK v4 async client.
Features
| Item | Description |
|---|---|
| Table type | Result table (sink only) |
| Running mode | Streaming mode |
| Data format | JSON (internal, not user-configurable) |
| Write semantics | At-least-once |
| Changelog support | INSERT, UPDATE, and DELETE, all written as UPSERT to Cosmos DB |
| Bulk writes | Supported |
| Auto-create | Supported (see cosmos.auto-create) |
The connector accepts INSERT, UPDATE, and DELETE changelog events. It converts Flink RowData to JSON and writes every event as an UPSERT operation to Cosmos DB, creating or replacing the document at the given id.
Prerequisites
- An Azure Cosmos DB account with the API for NoSQL
- A Cosmos DB database and container with a defined partition key, or use
cosmos.auto-create = trueto let the connector create them - The account endpoint URL and access key
- A Ververica Platform 3.x deployment
Schema Requirements
The Flink table DDL must satisfy the following constraints, enforced at job startup:
- The table schema must include a column named exactly
id. idmust appear in thePRIMARY KEY.- The
PRIMARY KEYmust be exactly(id)or(id, <all partition key columns>). No extra columns are allowed.
Partition key derivation follows these rules:
- If
cosmos.auto-create = true, the connector derives the partition key from thePARTITIONED BYclause. APARTITIONED BYclause is required in this case. - If
cosmos.auto-create = falseand the PRIMARY KEY includes columns beyondid, those columns become the partition key. - If
cosmos.auto-create = falseand the PRIMARY KEY is only(id), the connector fetches the partition key paths from the live container at DDL validation time.
Example with a single partition key column pk:
CREATE TABLE cosmos_sink (
id STRING,
pk STRING,
val DOUBLE,
PRIMARY KEY (id, pk) NOT ENFORCED
) WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = 'myDatabase',
'cosmos.container' = 'myContainer'
);
Example when the container partition key is /id:
CREATE TABLE cosmos_sink (
id STRING,
val DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = 'myDatabase',
'cosmos.container' = 'myContainer'
);
When PRIMARY KEY (id) NOT ENFORCED is used without additional partition key columns, the connector fetches the actual partition key paths from the container at DDL validation time. The container must exist and be reachable before the DDL executes.
Options
Required Options
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
connector | Yes | (none) | String | Must be cosmos. |
cosmos.endpoint | Yes | (none) | String | Azure Cosmos DB account endpoint URL, for example https://<account>.documents.azure.com:443/. |
cosmos.key | Yes | (none) | String | Azure Cosmos DB account access key. |
cosmos.database | Yes | (none) | String | Name of the Cosmos DB database. |
cosmos.container | Yes | (none) | String | Name of the Cosmos DB container to write to. |
To keep credentials out of SQL statements, use secret references: 'cosmos.key' = '${secret_values.cosmos_key}'. For more information, see Secret Values.
Optional Options
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
cosmos.preferred-regions | No | (none) | String | Comma-separated list of preferred Azure regions for the client, for example West Europe,North Europe. |
cosmos.max-microbatch-concurrency | No | 1 | Integer | Maximum concurrency for micro-batch bulk operations within a single flush. Higher values increase parallelism and resource usage. |
cosmos.async.bulk.request-timeout-ms | No | (none) | Long | Per-request timeout in milliseconds. If a request exceeds this limit, the connector treats it as failed and retries it. |
cosmos.auto-create | No | false | Boolean | When true, the connector creates the database and container if they do not exist. Requires a PARTITIONED BY clause. |
cosmos.fail.detector | No | built-in | String | Factory identifier for a custom CosmosRequestStatusDetectorFactory SPI implementation. When not set, the built-in DefaultCosmosRequestStatusDetector is used. |
cosmos.retry.backoff.initial-delay-ms | No | 500 | Long | Initial delay in milliseconds for the first connector-level retry attempt. |
cosmos.retry.backoff.max-delay-ms | No | 500 | Long | Maximum delay in milliseconds between connector-level retry attempts. |
cosmos.retry.backoff.multiplier | No | 1.0 | Double | Multiplicative factor applied between retry delays. A value of 1.0 produces a fixed delay. Values greater than 1.0 produce exponential backoff. |
cosmos.retry.max-attempts | No | 5 | Integer | Maximum number of connector-level retry attempts before the batch fails. |
Auto-Create
When cosmos.auto-create = true, the connector creates the database and container at job startup if they do not already exist. A non-empty PARTITIONED BY clause is required so the connector can derive the partition key without a live server round-trip.
- If the database does not exist, the connector creates it.
- If the container does not exist, the connector creates it with the partition key derived from the
PARTITIONED BYcolumns. - If the container already exists, the connector validates that its partition key matches the
PARTITIONED BYcolumns. A mismatch causes the job to fail at startup. - A single-column
PARTITIONED BYcreates a standard partition key. Two or more columns create a hierarchical (multi-hash) partition key. - Auto-create is idempotent and safe to run repeatedly.
CREATE TABLE cosmos_sink (
id STRING,
tenant STRING,
val DOUBLE,
PRIMARY KEY (id, tenant) NOT ENFORCED
) PARTITIONED BY (tenant)
WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = 'myDatabase',
'cosmos.container' = 'myNewContainer',
'cosmos.auto-create' = 'true'
);
SQL Examples
Basic Sink with Single Partition Key
CREATE TABLE cosmos_sink (
id STRING,
pk STRING,
value DOUBLE,
PRIMARY KEY (id, pk) NOT ENFORCED
) WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = 'myDatabase',
'cosmos.container' = 'myContainer'
);
Insert Data from a Source
INSERT INTO cosmos_sink
SELECT id, pk, value
FROM source_table;
Write Data with Preferred Regions
CREATE TABLE cosmos_sink_regional (
id STRING,
pk STRING,
event_time TIMESTAMP(3),
PRIMARY KEY (id, pk) NOT ENFORCED
) WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = 'myDatabase',
'cosmos.container' = 'myContainer',
'cosmos.preferred-regions' = 'West Europe,North Europe'
);
End-to-End Example with Datagen Source
-- Create a datagen source for testing
CREATE TABLE gen_src (
id STRING,
pk STRING,
value DOUBLE
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);
-- Create the CosmosDB sink
CREATE TABLE cosmos_output (
id STRING,
pk STRING,
value DOUBLE,
PRIMARY KEY (id, pk) NOT ENFORCED
) WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = 'testDB',
'cosmos.container' = 'testContainer'
);
-- Stream data from the source to CosmosDB
INSERT INTO cosmos_output
SELECT * FROM gen_src;
Fault Tolerance
Checkpointing
The connector integrates with Flink checkpointing through the AsyncSink framework. On checkpoint, the writer snapshots all buffered requests that have not yet been acknowledged. After a failure and recovery, the connector replays those requests to guarantee at-least-once delivery.
Retry and Throttling
Two independent retry layers handle failures and throttling responses.
SDK-level retry (not configurable)
The CosmosDB Java SDK v4 has a built-in throttling retry policy that intercepts HTTP 429 responses and retries them transparently before the response reaches the connector. This policy respects the Retry-After-Ms header from Cosmos DB. The connector does not configure or override this policy.
Connector-level retry (configurable)
On top of the SDK layer, the connector applies its own exponential backoff retry. After a bulk batch completes, the connector classifies each operation response individually and re-submits retryable operations after a delay.
Backoff formula:
delay = min(initialDelay × multiplier^(attempt - 1), maxDelay)
With the defaults (initial-delay-ms = 500, max-delay-ms = 500, multiplier = 1.0), all retries use a fixed 500 ms delay.
If any operation exceeds cosmos.retry.max-attempts, the batch fails.
Two types of bulk failures can occur:
- Partial failure: some operations in a bulk batch succeed and others fail. The connector classifies each failed operation individually.
- Total failure: the entire bulk request fails, for example due to a network error. The connector retries the full batch.
Request Status Classification
The built-in DefaultCosmosRequestStatusDetector classifies HTTP status codes as follows:
| HTTP Status | Classification | Effect |
|---|---|---|
| 200–299 | SUCCESS | The operation succeeds. |
| 408 | RETRY | The connector re-queues the operation with backoff. |
| 429 | RETRY | The connector re-queues the operation with backoff. |
| 5xx | RETRY | The connector re-queues the operation with backoff. |
| 400 | FAIL | The job fails immediately. |
| 401 | FAIL | The job fails immediately. |
| 403 | FAIL | The job fails immediately. |
| 404 | FAIL | The job fails immediately. |
| 409 | FAIL | The job fails immediately. |
| All other codes | DROP | The connector discards the operation silently. |
To change classification behavior, implement CosmosRequestStatusDetectorFactory and reference it with the cosmos.fail.detector option.
Limitations
- Sink only: no source or CDC support. Reading from the CosmosDB Change Feed is not supported.
- API for NoSQL only: MongoDB, Cassandra, Gremlin, Table, and PostgreSQL Cosmos DB APIs are not supported.
- At-least-once only: exactly-once is not supported. Duplicate writes may occur during recovery. UPSERT semantics reduce duplicate document creation for INSERT and UPDATE events.
- JSON format only: the connector uses Flink's
RowDataToJsonConvertersinternally. The format is not user-configurable. - Physical columns only: metadata and computed columns are not supported in the DDL.
idcolumn required: the table schema must include a column namedid.- Strict PRIMARY KEY constraint: the PRIMARY KEY must be exactly
(id)or(id, <partition key columns>). Mismatches cause the job to fail at startup.
Get Started
Step 1: Set Up Your Cosmos DB Account
- Create an Azure Cosmos DB account with the API for NoSQL.
- Go to Settings > Keys and copy the endpoint URI and the primary key.
- Note the database and container name, or plan to use
cosmos.auto-create = true. - Ensure the Cosmos DB account allows inbound traffic from your Flink deployment. The All Networks setting works for testing. For production, configure a firewall rule or virtual network integration.
Step 2: Store Your Credentials
Store the Cosmos DB key as a secret value in Ververica Platform rather than embedding it in your SQL:
- Go to Security > Secret Values.
- Create a secret named
cosmos_keywith the value of your Cosmos DB primary key. - Reference it in SQL as
'cosmos.key' = '${secret_values.cosmos_key}'.
Step 3: Run a Flink SQL Job
Create a source and sink, then stream data into Cosmos DB:
CREATE TABLE gen_src (
id STRING,
pk STRING,
amount DOUBLE
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5'
);
CREATE TABLE cosmos_output (
id STRING,
pk STRING,
amount DOUBLE,
PRIMARY KEY (id, pk) NOT ENFORCED
) WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<your-account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = '<your-database>',
'cosmos.container' = '<your-container>'
);
INSERT INTO cosmos_output
SELECT * FROM gen_src;
Step 4: Validate
In the Azure Portal, open your Cosmos DB container in Data Explorer to confirm documents are being written.