MongoDB
The MongoDB connector enables reading from, writing to, and joining MongoDB collections directly from Flink SQL. It supports batch snapshots, CDC streaming, dimension (lookup join) tables, and result (sink) tables. The connector requires MongoDB 4.0 or later.
Features
| Item | Description |
|---|---|
| Table types | Source table (batch snapshot or CDC streaming), dimension table (lookup join), and result table (sink) |
| Running mode | Streaming mode |
| Metrics (source / CDC) | numBytesIn, numBytesInPerSecond, numRecordsIn, numRecordsInPerSecond, numRecordsInErrors, currentFetchEventTimeLag, currentEmitEventTimeLag, watermarkLag, sourceIdleTime |
| Metrics (dimension / result) | None |
| API types | DataStream API and SQL API |
| Data update / deletion in result table | Supported (requires primary key) |
Source Table Types
The MongoDB connector supports two distinct source table modes:
| Mode | connector Value | Behavior |
|---|---|---|
| Regular (batch) source | mongodb | Reads a bounded snapshot of a collection and finishes. No change tracking. |
| CDC (streaming) source | mongodb-cdc | Reads a full snapshot, then continuously captures changes through the Change Stream API. Produces an unbounded changelog stream. |
Use mongodb for dimension tables and result tables.
Full Changelog Mode
Full changelog mode applies to CDC sources (mongodb-cdc) only and requires MongoDB 6.0 or later with pre-images and post-images enabled on the collection.
By default, the MongoDB CDC connector operates in upsert mode: Change Stream events for updates contain only the post-update document state. The connector cannot emit UPDATE_BEFORE rows, so downstream operators that need the previous row value must maintain state internally.
When pre-images and post-images are enabled on a collection and scan.full-changelog = true is set, the connector emits a complete changelog stream containing all four Flink row kinds: INSERT, UPDATE_BEFORE, UPDATE_AFTER, and DELETE.
Prerequisites
- MongoDB 6.0 or later.
- Pre-images and post-images enabled on the collection. Enable them on an existing collection:
db.runCommand({
collMod: "products",
changeStreamPreAndPostImages: { enabled: true }
})
Or specify the option when creating a new collection:
db.createCollection("products", {
changeStreamPreAndPostImages: { enabled: true }
})
- A replica set or sharded cluster. The Change Stream API does not support standalone instances.
Configuration
Set scan.full-changelog to true in the WITH clause:
CREATE TEMPORARY TABLE mongo_full_cdc (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products',
'scan.full-changelog' = 'true'
);
How It Works
When scan.full-changelog is enabled:
- Insert events: the connector emits an
INSERTrow with the new document. - Update and replace events: the connector emits an
UPDATE_BEFORErow containing the pre-image (document state before the change), followed by anUPDATE_AFTERrow containing the post-image (document state after the change). - Delete events: the connector emits a
DELETErow containing the pre-image of the deleted document.
This eliminates the need for Flink operators to maintain state for deriving old values, resulting in lower state usage and more accurate retraction-based computations.
When to Use Full Changelog Mode
- Aggregations with retractions:
GROUP BYqueries, windowed aggregations, and other operators that retract old values benefit from receiving explicitUPDATE_BEFORErows. - Changelog-based sinks: sinks that require before-images for updates or deletes.
- Reduced state overhead: because MongoDB provides the old row, Flink does not need to keep a copy of the last-seen row in state.
Limitations
- This feature is experimental.
- Pre-images and post-images increase storage and I/O on the MongoDB server. Evaluate the performance impact before enabling on high-throughput collections.
- If the collection does not have pre-images and post-images enabled and
scan.full-changelogistrue, the connector fails at startup.
Prerequisites
CDC Source
- The MongoDB CDC connector reads data from self-managed MongoDB databases.
- The replica set feature must be enabled on the monitored database. For more information, see Replication.
- If MongoDB authentication is enabled, the user must have the following permissions:
splitVector,listDatabases,listCollections,collStats,find,changeStream, and read access to theconfig.collectionsandconfig.chunkscollections. - Requires a VERA engine compatible with Flink 1.17 or later.
Dimension Table and Result Table
- The MongoDB database and collection must already exist.
- Requires a VERA engine compatible with Flink 1.17 or later (dimension tables).
Limitations
- If no primary key is defined on a result table, only
INSERTis supported. Updates and deletes are not supported. - Exactly-once delivery is not supported for result tables. At-least-once is the maximum.
- The MongoDB CDC connector and dimension tables require a VERA engine compatible with Flink 1.17 or later.
- Setting
scan.startup.mode = timestamprequires MongoDB 4.0 or later. - Full changelog mode requires MongoDB 6.0 or later.
Syntax
CREATE TABLE tableName (
_id STRING,
-- additional columns
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = '<database>',
'collection' = '<collection>'
);
When you create a MongoDB CDC source table, you must declare the _id STRING column and specify it as the primary key.
Options
Common Options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
connector | String | Yes | (none) | mongodb for all table types. Use mongodb-cdc as an explicit connector value for CDC source tables. |
uri | String | No | (none) | Full MongoDB connection URI. If set, overrides scheme, hosts, username, password, and connection.options. |
hosts | String | No | (none) | Hostname of the MongoDB instance. Separate multiple hosts with commas. Either uri or hosts must be set. |
scheme | String | No | mongodb | Connection protocol. Valid values: mongodb, mongodb+srv. |
username | String | No | (none) | Required if MongoDB authentication is enabled. |
password | String | No | (none) | Required if MongoDB authentication is enabled. |
database | String | No | (none) | MongoDB database name. Supports regular expressions for CDC sources. If omitted, all databases are monitored. |
collection | String | No | (none) | MongoDB collection name. Supports regular expressions. For collection names that contain regex special characters, use a fully qualified database.collection name. |
connection.options | String | No | (none) | Additional connection options as key=value pairs separated by &, for example connectTimeoutMS=12000&socketTimeoutMS=13000. |
To keep credentials out of SQL statements, use secret references: 'password' = '${secret_values.password}'. For more information, see Secret Values.