Skip to main content

Azure CosmosDB

The Azure CosmosDB connector enables writing data from Flink to Azure Cosmos DB using the API for NoSQL. It is a sink-only connector with at-least-once delivery semantics, built on the Flink AsyncSink framework and the Azure Cosmos DB Java SDK v4 async client.

Features

ItemDescription
Table typeResult table (sink only)
Running modeStreaming mode
Data formatJSON (internal, not user-configurable)
Write semanticsAt-least-once
Changelog supportINSERT, UPDATE, and DELETE, all written as UPSERT to Cosmos DB
Bulk writesSupported
Auto-createSupported (see cosmos.auto-create)

The connector accepts INSERT, UPDATE, and DELETE changelog events. It converts Flink RowData to JSON and writes every event as an UPSERT operation to Cosmos DB, creating or replacing the document at the given id.

Prerequisites

  • An Azure Cosmos DB account with the API for NoSQL
  • A Cosmos DB database and container with a defined partition key, or use cosmos.auto-create = true to let the connector create them
  • The account endpoint URL and access key
  • A Ververica Platform 3.x deployment

Schema Requirements

The Flink table DDL must satisfy the following constraints, enforced at job startup:

  1. The table schema must include a column named exactly id.
  2. id must appear in the PRIMARY KEY.
  3. The PRIMARY KEY must be exactly (id) or (id, <all partition key columns>). No extra columns are allowed.

Partition key derivation follows these rules:

  • If cosmos.auto-create = true, the connector derives the partition key from the PARTITIONED BY clause. A PARTITIONED BY clause is required in this case.
  • If cosmos.auto-create = false and the PRIMARY KEY includes columns beyond id, those columns become the partition key.
  • If cosmos.auto-create = false and the PRIMARY KEY is only (id), the connector fetches the partition key paths from the live container at DDL validation time.

Example with a single partition key column pk:

CREATE TABLE cosmos_sink (
id STRING,
pk STRING,
val DOUBLE,
PRIMARY KEY (id, pk) NOT ENFORCED
) WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = 'myDatabase',
'cosmos.container' = 'myContainer'
);

Example when the container partition key is /id:

CREATE TABLE cosmos_sink (
id STRING,
val DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = 'myDatabase',
'cosmos.container' = 'myContainer'
);
note

When PRIMARY KEY (id) NOT ENFORCED is used without additional partition key columns, the connector fetches the actual partition key paths from the container at DDL validation time. The container must exist and be reachable before the DDL executes.

Options

Required Options

OptionRequiredDefaultTypeDescription
connectorYes(none)StringMust be cosmos.
cosmos.endpointYes(none)StringAzure Cosmos DB account endpoint URL, for example https://<account>.documents.azure.com:443/.
cosmos.keyYes(none)StringAzure Cosmos DB account access key.
cosmos.databaseYes(none)StringName of the Cosmos DB database.
cosmos.containerYes(none)StringName of the Cosmos DB container to write to.
note

To keep credentials out of SQL statements, use secret references: 'cosmos.key' = '${secret_values.cosmos_key}'. For more information, see Secret Values.

Optional Options

OptionRequiredDefaultTypeDescription
cosmos.preferred-regionsNo(none)StringComma-separated list of preferred Azure regions for the client, for example West Europe,North Europe.
cosmos.max-microbatch-concurrencyNo1IntegerMaximum concurrency for micro-batch bulk operations within a single flush. Higher values increase parallelism and resource usage.
cosmos.async.bulk.request-timeout-msNo(none)LongPer-request timeout in milliseconds. If a request exceeds this limit, the connector treats it as failed and retries it.
cosmos.auto-createNofalseBooleanWhen true, the connector creates the database and container if they do not exist. Requires a PARTITIONED BY clause.
cosmos.fail.detectorNobuilt-inStringFactory identifier for a custom CosmosRequestStatusDetectorFactory SPI implementation. When not set, the built-in DefaultCosmosRequestStatusDetector is used.
cosmos.retry.backoff.initial-delay-msNo500LongInitial delay in milliseconds for the first connector-level retry attempt.
cosmos.retry.backoff.max-delay-msNo500LongMaximum delay in milliseconds between connector-level retry attempts.
cosmos.retry.backoff.multiplierNo1.0DoubleMultiplicative factor applied between retry delays. A value of 1.0 produces a fixed delay. Values greater than 1.0 produce exponential backoff.
cosmos.retry.max-attemptsNo5IntegerMaximum number of connector-level retry attempts before the batch fails.

Auto-Create

When cosmos.auto-create = true, the connector creates the database and container at job startup if they do not already exist. A non-empty PARTITIONED BY clause is required so the connector can derive the partition key without a live server round-trip.

  • If the database does not exist, the connector creates it.
  • If the container does not exist, the connector creates it with the partition key derived from the PARTITIONED BY columns.
  • If the container already exists, the connector validates that its partition key matches the PARTITIONED BY columns. A mismatch causes the job to fail at startup.
  • A single-column PARTITIONED BY creates a standard partition key. Two or more columns create a hierarchical (multi-hash) partition key.
  • Auto-create is idempotent and safe to run repeatedly.
CREATE TABLE cosmos_sink (
id STRING,
tenant STRING,
val DOUBLE,
PRIMARY KEY (id, tenant) NOT ENFORCED
) PARTITIONED BY (tenant)
WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = 'myDatabase',
'cosmos.container' = 'myNewContainer',
'cosmos.auto-create' = 'true'
);

SQL Examples

Basic Sink with Single Partition Key

CREATE TABLE cosmos_sink (
id STRING,
pk STRING,
value DOUBLE,
PRIMARY KEY (id, pk) NOT ENFORCED
) WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = 'myDatabase',
'cosmos.container' = 'myContainer'
);

Insert Data from a Source

INSERT INTO cosmos_sink
SELECT id, pk, value
FROM source_table;

Write Data with Preferred Regions

CREATE TABLE cosmos_sink_regional (
id STRING,
pk STRING,
event_time TIMESTAMP(3),
PRIMARY KEY (id, pk) NOT ENFORCED
) WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = 'myDatabase',
'cosmos.container' = 'myContainer',
'cosmos.preferred-regions' = 'West Europe,North Europe'
);

End-to-End Example with Datagen Source

-- Create a datagen source for testing
CREATE TABLE gen_src (
id STRING,
pk STRING,
value DOUBLE
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);

-- Create the CosmosDB sink
CREATE TABLE cosmos_output (
id STRING,
pk STRING,
value DOUBLE,
PRIMARY KEY (id, pk) NOT ENFORCED
) WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = 'testDB',
'cosmos.container' = 'testContainer'
);

-- Stream data from the source to CosmosDB
INSERT INTO cosmos_output
SELECT * FROM gen_src;

Fault Tolerance

Checkpointing

The connector integrates with Flink checkpointing through the AsyncSink framework. On checkpoint, the writer snapshots all buffered requests that have not yet been acknowledged. After a failure and recovery, the connector replays those requests to guarantee at-least-once delivery.

Retry and Throttling

Two independent retry layers handle failures and throttling responses.

SDK-level retry (not configurable)

The CosmosDB Java SDK v4 has a built-in throttling retry policy that intercepts HTTP 429 responses and retries them transparently before the response reaches the connector. This policy respects the Retry-After-Ms header from Cosmos DB. The connector does not configure or override this policy.

Connector-level retry (configurable)

On top of the SDK layer, the connector applies its own exponential backoff retry. After a bulk batch completes, the connector classifies each operation response individually and re-submits retryable operations after a delay.

Backoff formula:

delay = min(initialDelay × multiplier^(attempt - 1), maxDelay)

With the defaults (initial-delay-ms = 500, max-delay-ms = 500, multiplier = 1.0), all retries use a fixed 500 ms delay.

If any operation exceeds cosmos.retry.max-attempts, the batch fails.

Two types of bulk failures can occur:

  • Partial failure: some operations in a bulk batch succeed and others fail. The connector classifies each failed operation individually.
  • Total failure: the entire bulk request fails, for example due to a network error. The connector retries the full batch.

Request Status Classification

The built-in DefaultCosmosRequestStatusDetector classifies HTTP status codes as follows:

HTTP StatusClassificationEffect
200–299SUCCESSThe operation succeeds.
408RETRYThe connector re-queues the operation with backoff.
429RETRYThe connector re-queues the operation with backoff.
5xxRETRYThe connector re-queues the operation with backoff.
400FAILThe job fails immediately.
401FAILThe job fails immediately.
403FAILThe job fails immediately.
404FAILThe job fails immediately.
409FAILThe job fails immediately.
All other codesDROPThe connector discards the operation silently.

To change classification behavior, implement CosmosRequestStatusDetectorFactory and reference it with the cosmos.fail.detector option.

Limitations

  • Sink only: no source or CDC support. Reading from the CosmosDB Change Feed is not supported.
  • API for NoSQL only: MongoDB, Cassandra, Gremlin, Table, and PostgreSQL Cosmos DB APIs are not supported.
  • At-least-once only: exactly-once is not supported. Duplicate writes may occur during recovery. UPSERT semantics reduce duplicate document creation for INSERT and UPDATE events.
  • JSON format only: the connector uses Flink's RowDataToJsonConverters internally. The format is not user-configurable.
  • Physical columns only: metadata and computed columns are not supported in the DDL.
  • id column required: the table schema must include a column named id.
  • Strict PRIMARY KEY constraint: the PRIMARY KEY must be exactly (id) or (id, <partition key columns>). Mismatches cause the job to fail at startup.

Get Started

Step 1: Set Up Your Cosmos DB Account

  1. Create an Azure Cosmos DB account with the API for NoSQL.
  2. Go to Settings > Keys and copy the endpoint URI and the primary key.
  3. Note the database and container name, or plan to use cosmos.auto-create = true.
  4. Ensure the Cosmos DB account allows inbound traffic from your Flink deployment. The All Networks setting works for testing. For production, configure a firewall rule or virtual network integration.

Step 2: Store Your Credentials

Store the Cosmos DB key as a secret value in Ververica Platform rather than embedding it in your SQL:

  1. Go to Security > Secret Values.
  2. Create a secret named cosmos_key with the value of your Cosmos DB primary key.
  3. Reference it in SQL as 'cosmos.key' = '${secret_values.cosmos_key}'.

Create a source and sink, then stream data into Cosmos DB:

CREATE TABLE gen_src (
id STRING,
pk STRING,
amount DOUBLE
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5'
);

CREATE TABLE cosmos_output (
id STRING,
pk STRING,
amount DOUBLE,
PRIMARY KEY (id, pk) NOT ENFORCED
) WITH (
'connector' = 'cosmos',
'cosmos.endpoint' = 'https://<your-account>.documents.azure.com:443/',
'cosmos.key' = '${secret_values.cosmos_key}',
'cosmos.database' = '<your-database>',
'cosmos.container' = '<your-container>'
);

INSERT INTO cosmos_output
SELECT * FROM gen_src;

Step 4: Validate

In the Azure Portal, open your Cosmos DB container in Data Explorer to confirm documents are being written.