MongoDB
The MongoDB connector enables reading from, writing to, and joining MongoDB collections directly from Flink SQL. It supports batch snapshots, CDC streaming, dimension (lookup join) tables, and result (sink) tables. The connector requires MongoDB 4.0 or later.
Features
| Item | Description |
|---|---|
| Table types | Source table (batch snapshot or CDC streaming), dimension table (lookup join), and result table (sink) |
| Running mode | Streaming mode |
| Metrics (source / CDC) | numBytesIn, numBytesInPerSecond, numRecordsIn, numRecordsInPerSecond, numRecordsInErrors, currentFetchEventTimeLag, currentEmitEventTimeLag, watermarkLag, sourceIdleTime |
| Metrics (dimension / result) | None |
| API types | DataStream API and SQL API |
| Data update / deletion in result table | Supported (requires primary key) |
Source Table Types
The MongoDB connector supports two distinct source table modes:
| Mode | connector Value | Behavior |
|---|---|---|
| Regular (batch) source | mongodb | Reads a bounded snapshot of a collection and finishes. No change tracking. |
| CDC (streaming) source | mongodb-cdc | Reads a full snapshot, then continuously captures changes through the Change Stream API. Produces an unbounded changelog stream. |
Use mongodb for dimension tables and result tables.
Full Changelog Mode
Full changelog mode applies to CDC sources (mongodb-cdc) only and requires MongoDB 6.0 or later with pre-images and post-images enabled on the collection.
By default, the MongoDB CDC connector operates in upsert mode: Change Stream events for updates contain only the post-update document state. The connector cannot emit UPDATE_BEFORE rows, so downstream operators that need the previous row value must maintain state internally.
When pre-images and post-images are enabled on a collection and scan.full-changelog = true is set, the connector emits a complete changelog stream containing all four Flink row kinds: INSERT, UPDATE_BEFORE, UPDATE_AFTER, and DELETE.
Prerequisites
- MongoDB 6.0 or later.
- Pre-images and post-images enabled on the collection. Enable them on an existing collection:
db.runCommand({
collMod: "products",
changeStreamPreAndPostImages: { enabled: true }
})
Or specify the option when creating a new collection:
db.createCollection("products", {
changeStreamPreAndPostImages: { enabled: true }
})
- A replica set or sharded cluster. The Change Stream API does not support standalone instances.
Configuration
Set scan.full-changelog to true in the WITH clause:
CREATE TEMPORARY TABLE mongo_full_cdc (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products',
'scan.full-changelog' = 'true'
);
How It Works
When scan.full-changelog is enabled:
- Insert events: the connector emits an
INSERTrow with the new document. - Update and replace events: the connector emits an
UPDATE_BEFORErow containing the pre-image (document state before the change), followed by anUPDATE_AFTERrow containing the post-image (document state after the change). - Delete events: the connector emits a
DELETErow containing the pre-image of the deleted document.
This eliminates the need for Flink operators to maintain state for deriving old values, resulting in lower state usage and more accurate retraction-based computations.
When to Use Full Changelog Mode
- Aggregations with retractions:
GROUP BYqueries, windowed aggregations, and other operators that retract old values benefit from receiving explicitUPDATE_BEFORErows. - Changelog-based sinks: sinks that require before-images for updates or deletes.
- Reduced state overhead: because MongoDB provides the old row, Flink does not need to keep a copy of the last-seen row in state.
Limitations
- This feature is experimental.
- Pre-images and post-images increase storage and I/O on the MongoDB server. Evaluate the performance impact before enabling on high-throughput collections.
- If the collection does not have pre-images and post-images enabled and
scan.full-changelogistrue, the connector fails at startup.
Prerequisites
CDC Source
- The MongoDB CDC connector reads data from self-managed MongoDB databases.
- The replica set feature must be enabled on the monitored database. For more information, see Replication.
- If MongoDB authentication is enabled, the user must have the following permissions:
splitVector,listDatabases,listCollections,collStats,find,changeStream, and read access to theconfig.collectionsandconfig.chunkscollections. - Requires a VERA engine compatible with Flink 1.17 or later.
Dimension Table and Result Table
- The MongoDB database and collection must already exist.
- Requires a VERA engine compatible with Flink 1.17 or later (dimension tables).
Limitations
- If no primary key is defined on a result table, only
INSERTis supported. Updates and deletes are not supported. - Exactly-once delivery is not supported for result tables. At-least-once is the maximum.
- The MongoDB CDC connector and dimension tables require a VERA engine compatible with Flink 1.17 or later.
- Setting
scan.startup.mode = timestamprequires MongoDB 4.0 or later. - Full changelog mode requires MongoDB 6.0 or later.
Syntax
CREATE TABLE tableName (
_id STRING,
-- additional columns
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = '<database>',
'collection' = '<collection>'
);
When you create a MongoDB CDC source table, you must declare the _id STRING column and specify it as the primary key.
Options
Common Options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
connector | String | Yes | (none) | mongodb for all table types. Use mongodb-cdc as an explicit connector value for CDC source tables. |
uri | String | No | (none) | Full MongoDB connection URI. If set, overrides scheme, hosts, username, password, and connection.options. |
hosts | String | No | (none) | Hostname of the MongoDB instance. Separate multiple hosts with commas. Either uri or hosts must be set. |
scheme | String | No | mongodb | Connection protocol. Valid values: mongodb, mongodb+srv. |
username | String | No | (none) | Required if MongoDB authentication is enabled. |
password | String | No | (none) | Required if MongoDB authentication is enabled. |
database | String | No | (none) | MongoDB database name. Supports regular expressions for CDC sources. If omitted, all databases are monitored. |
collection | String | No | (none) | MongoDB collection name. Supports regular expressions. For collection names that contain regex special characters, use a fully qualified database.collection name. |
connection.options | String | No | (none) | Additional connection options as key=value pairs separated by &, for example connectTimeoutMS=12000&socketTimeoutMS=13000. |
To keep credentials out of SQL statements, use secret references: 'password' = '${secret_values.password}'. For more information, see Secret Values.
CDC Source Options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
scan.startup.mode | String | No | initial | Startup mode. initial reads a full snapshot then streams changes. latest-offset streams changes from the current offset. timestamp streams changes from a specific point in time. |
scan.startup.timestamp-millis | Long | Required when scan.startup.mode = timestamp | (none) | UNIX epoch milliseconds for the timestamp startup mode. |
initial.snapshotting.queue.size | Integer | No | 10240 | Maximum queue size during the initial snapshot phase. |
batch.size | Integer | No | 1024 | Cursor batch processing size. |
poll.max.batch.size | Integer | No | 1024 | Maximum change documents processed per batch during streaming. |
poll.await.time.ms | Integer | No | 1000 | Polling interval in milliseconds. |
heartbeat.interval.ms | Integer | No | 0 | Heartbeat interval in milliseconds. A value of 0 disables heartbeats. Configure this for infrequently updated collections. |
scan.incremental.snapshot.enabled | Boolean | No | false | Enables parallel reading during the initial snapshot phase. This is an experimental feature. |
scan.incremental.snapshot.chunk.size.mb | Integer | No | 64 | Shard size in MB when parallel snapshot reading is enabled. |
scan.full-changelog | Boolean | No | false | Enables full changelog event streams. Requires MongoDB 6.0 or later with changeStreamPreAndPostImages enabled on the collection. This is an experimental feature. |
scan.flatten-nested-columns.enabled | Boolean | No | false | Reads nested BSON fields as dot-separated names, for example nested.col. Requires a VERA engine compatible with Flink 1.17 or later. |
scan.primitive-as-string | Boolean | No | false | Infers all primitive BSON types as STRING. Requires a VERA engine compatible with Flink 1.17 or later. |
Dimension Table Options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
lookup.cache | String | No | NONE | Cache policy. NONE disables caching. PARTIAL caches looked-up records. |
lookup.max-retries | Integer | No | 3 | Maximum retries on lookup failure. |
lookup.retry.interval | Duration | No | 1s | Retry interval on lookup failure. |
lookup.partial-cache.expire-after-access | Duration | No | (none) | Cache TTL after last access. Requires lookup.cache = PARTIAL. |
lookup.partial-cache.expire-after-write | Duration | No | (none) | Cache TTL after write. Requires lookup.cache = PARTIAL. |
lookup.partial-cache.max-rows | Long | No | (none) | Maximum number of cached rows. Requires lookup.cache = PARTIAL. |
lookup.partial-cache.cache-missing-key | Boolean | No | true | Caches empty results for missing keys. Requires lookup.cache = PARTIAL. |
Result Table Options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
sink.buffer-flush.max-rows | Integer | No | 1000 | Maximum records written per flush. |
sink.buffer-flush.interval | Duration | No | 1s | Flush interval. |
sink.delivery-guarantee | String | No | at-least-once | Write semantics. Valid values: none, at-least-once. Exactly-once is not supported. |
sink.max-retries | Integer | No | 3 | Maximum retries on write failure. |
sink.retry.interval | Duration | No | 1s | Retry interval on write failure. |
sink.parallelism | Integer | No | (none) | Sink parallelism. |
Data Type Mappings
CDC Source Table
| BSON Data Type | Flink SQL Data Type |
|---|---|
| Int32 | INT |
| Int64 | BIGINT |
| Double | DOUBLE |
| Decimal128 | DECIMAL(p, s) |
| Boolean | BOOLEAN |
| Date / Timestamp | DATE |
| Date / Timestamp | TIME |
| DateTime | TIMESTAMP(3), TIMESTAMP_LTZ(3) |
| Timestamp | TIMESTAMP(0), TIMESTAMP_LTZ(0) |
| String, ObjectId, UUID, Symbol, MD5, JavaScript, Regex | STRING |
| Binary | BYTES |
| Object | ROW |
| Array | ARRAY |
| DBPointer | ROW<$ref STRING, $id STRING> |
| GeoJSON Point | ROW<type STRING, coordinates ARRAY<DOUBLE>> |
| GeoJSON Line | ROW<type STRING, coordinates ARRAY<ARRAY<DOUBLE>>> |
Dimension Table and Result Table
| BSON Data Type | Flink SQL Data Type |
|---|---|
| Int32 | INT |
| Int64 | BIGINT |
| Double | DOUBLE |
| Decimal128 | DECIMAL |
| Boolean | BOOLEAN |
| DateTime | TIMESTAMP_LTZ(3) |
| Timestamp | TIMESTAMP_LTZ(0) |
| String | STRING |
| Binary | BYTES |
| Object | ROW |
| Array | ARRAY |
SQL Examples
Regular Source to Print Sink
CREATE TEMPORARY TABLE mongo_source (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products'
);
SELECT * FROM mongo_source;
CDC Source to Print Sink
CREATE TEMPORARY TABLE mongo_cdc_source (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products'
);
CREATE TEMPORARY TABLE products_sink (
`_id` STRING,
name STRING,
weight DECIMAL
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO products_sink
SELECT _id, name, weight
FROM mongo_cdc_source;
Datagen to MongoDB Result Table
CREATE TEMPORARY TABLE datagen_source (
`_id` STRING,
name STRING,
weight DECIMAL
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products',
'sink.buffer-flush.max-rows' = '500',
'sink.buffer-flush.interval' = '2s',
'sink.delivery-guarantee' = 'at-least-once',
'sink.max-retries' = '5'
);
INSERT INTO mongo_sink SELECT * FROM datagen_source;
Lookup Join
CREATE TEMPORARY TABLE datagen_source (
id STRING,
a INT,
b BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products'
);
CREATE TEMPORARY TABLE enriched_sink (
id STRING,
a INT,
b BIGINT,
name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO enriched_sink
SELECT T.id, T.a, T.b, H.name
FROM datagen_source AS T
JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H
ON T.id = H._id;
Full Changelog CDC Source to Print Sink
CREATE TEMPORARY TABLE mongo_full_cdc (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products',
'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE products_sink (
`_id` STRING,
name STRING,
weight DECIMAL
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO products_sink
SELECT _id, name, weight
FROM mongo_full_cdc;
DataStream API
CDC Source (MongoDBSource)
Build a MongoDBSource for CDC streaming:
MongoDBSource.builder()
.hosts("<host>:27017")
.username("<username>")
.password("<password>")
.databaseList("myDatabase")
.collectionList("products")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
To enable the incremental snapshot feature, use MongoDBSource#builder() from the com.ververica.cdc.connectors.mongodb.source package. Without incremental snapshots, use MongoDBSource#builder() from com.ververica.cdc.connectors.mongodb.
MongoDBSource Parameters
| Parameter | Description |
|---|---|
hosts | Hostname of the MongoDB database. |
username | MongoDB username. Not required if authentication is disabled. |
password | MongoDB password. Not required if authentication is disabled. |
databaseList | Database names to monitor. Supports regex. Use .* to match all databases. |
collectionList | Collection names to monitor. Supports regex. |
startupOptions | StartupOptions.initial(), StartupOptions.latestOffset(), or StartupOptions.timestamp(millis). |
deserializer | MongoDBConnectorDeserializationSchema (upsert to RowData), MongoDBConnectorFullChangelogDeserializationSchema (full changelog to RowData), or JsonDebeziumDeserializationSchema (JSON string). |
Bounded Source (MongoSource)
Build a MongoSource for bounded batch reads:
MongoSource.<RowData>builder()
.setUri("mongodb://<username>:<password>@<host>:27017")
.setDatabase("myDatabase")
.setCollection("products")
.setDeserializationSchema(new MongoDeserializationSchema<RowData>() {
@Override
public RowData deserialize(BsonDocument document) {
// Convert BsonDocument to RowData
}
@Override
public TypeInformation<RowData> getProducedType() {
return Types.ROW_NAMED(...);
}
})
.build();
Sink (MongoSink)
Build a MongoSink for writing data with the DataStream API:
MongoSink.<RowData>builder()
.setUri("mongodb://<username>:<password>@<host>:27017")
.setDatabase("myDatabase")
.setCollection("products")
.setBatchSize(1000)
.setBatchIntervalMs(1000)
.setMaxRetries(3)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setSerializationSchema(
new MongoSerializationSchema<RowData>() {
@Override
public BsonDocument serialize(RowData element) {
// Convert RowData to BsonDocument
}
})
.build();
MongoSink Parameters
| Parameter | Description |
|---|---|
setUri | Full MongoDB connection URI. |
setDatabase | Target database name. |
setCollection | Target collection name. |
setBatchSize | Maximum number of documents per write batch. |
setBatchIntervalMs | Flush interval in milliseconds. |
setMaxRetries | Maximum retries on write failure. |
setDeliveryGuarantee | DeliveryGuarantee.AT_LEAST_ONCE or DeliveryGuarantee.NONE. |
setSerializationSchema | A MongoSerializationSchema implementation that converts records to BsonDocument. |