MongoDB
On this page
The MongoDB connector enables reading from, writing to, and joining MongoDB collections directly from Flink SQL. It supports batch snapshots, CDC streaming, dimension (lookup join) tables, and result (sink) tables. The connector requires MongoDB 4.0 or later.
Features
Source Table Types
The MongoDB connector supports two distinct source table modes:
Use mongodb for dimension tables and result tables.
Full Changelog Mode
Full changelog mode applies to CDC sources (mongodb-cdc) only and requires MongoDB 6.0 or later with pre-images and post-images enabled on the collection.
By default, the MongoDB CDC connector operates in upsert mode: Change Stream events for updates contain only the post-update document state. The connector cannot emit UPDATE_BEFORE rows, so downstream operators that need the previous row value must maintain state internally.
When pre-images and post-images are enabled on a collection and scan.full-changelog = true is set, the connector emits a complete changelog stream containing all four Flink row kinds: INSERT, UPDATE_BEFORE, UPDATE_AFTER, and DELETE.
Prerequisites
- MongoDB 6.0 or later.
- Pre-images and post-images enabled on the collection. Enable them on an existing collection:
1db.runCommand({
2 collMod: "products",
3 changeStreamPreAndPostImages: { enabled: true }
4})Or specify the option when creating a new collection:
1db.createCollection("products", {
2 changeStreamPreAndPostImages: { enabled: true }
3})- A replica set or sharded cluster. The Change Stream API does not support standalone instances.
Configuration
Set scan.full-changelog to true in the WITH clause:
1CREATE TEMPORARY TABLE mongo_full_cdc (
2 `_id` STRING,
3 name STRING,
4 weight DECIMAL,
5 PRIMARY KEY (`_id`) NOT ENFORCED
6) WITH (
7 'connector' = 'mongodb-cdc',
8 'hosts' = '<host>:27017',
9 'username' = '<username>',
10 'password' = '${secret_values.password}',
11 'database' = 'myDatabase',
12 'collection' = 'products',
13 'scan.full-changelog' = 'true'
14);How It Works
When scan.full-changelog is enabled:
- Insert events: the connector emits an
INSERTrow with the new document. - Update and replace events: the connector emits an
UPDATE_BEFORErow containing the pre-image (document state before the change), followed by anUPDATE_AFTERrow containing the post-image (document state after the change). - Delete events: the connector emits a
DELETErow containing the pre-image of the deleted document.
This eliminates the need for Flink operators to maintain state for deriving old values, resulting in lower state usage and more accurate retraction-based computations.
When to Use Full Changelog Mode
- Aggregations with retractions:
GROUP BYqueries, windowed aggregations, and other operators that retract old values benefit from receiving explicitUPDATE_BEFORErows. - Changelog-based sinks: sinks that require before-images for updates or deletes.
- Reduced state overhead: because MongoDB provides the old row, Flink does not need to keep a copy of the last-seen row in state.
Limitations
- This feature is experimental.
- Pre-images and post-images increase storage and I/O on the MongoDB server. Evaluate the performance impact before enabling on high-throughput collections.
- If the collection does not have pre-images and post-images enabled and
scan.full-changelogistrue, the connector fails at startup.
Prerequisites
CDC Source
- The MongoDB CDC connector reads data from self-managed MongoDB databases.
- The replica set feature must be enabled on the monitored database. For more information, see Replication.
- If MongoDB authentication is enabled, the user must have the following permissions:
splitVector,listDatabases,listCollections,collStats,find,changeStream, and read access to theconfig.collectionsandconfig.chunkscollections. - Requires a VERA engine compatible with Flink 1.17 or later.
Dimension Table and Result Table
- The MongoDB database and collection must already exist.
- Requires a VERA engine compatible with Flink 1.17 or later (dimension tables).
Limitations
- If no primary key is defined on a result table, only
INSERTis supported. Updates and deletes are not supported. - Exactly-once delivery is not supported for result tables. At-least-once is the maximum.
- The MongoDB CDC connector and dimension tables require a VERA engine compatible with Flink 1.17 or later.
- Setting
scan.startup.mode = timestamprequires MongoDB 4.0 or later. - Full changelog mode requires MongoDB 6.0 or later.
Syntax
1CREATE TABLE tableName (
2 _id STRING,
3 -- additional columns
4 PRIMARY KEY (_id) NOT ENFORCED
5) WITH (
6 'connector' = 'mongodb',
7 'hosts' = '<host>:27017',
8 'username' = '<username>',
9 'password' = '${secret_values.password}',
10 'database' = '<database>',
11 'collection' = '<collection>'
12);When you create a MongoDB CDC source table, you must declare the _id STRING column and specify it as the primary key.
Options
Common Options
To keep credentials out of SQL statements, use secret references: 'password' = '${secret_values.password}'. For more information, see Secret Values.
CDC Source Options
Dimension Table Options
Result Table Options
Data Type Mappings
CDC Source Table
Dimension Table and Result Table
SQL Examples
Regular Source to Print Sink
1CREATE TEMPORARY TABLE mongo_source (
2 `_id` STRING,
3 name STRING,
4 weight DECIMAL,
5 PRIMARY KEY (`_id`) NOT ENFORCED
6) WITH (
7 'connector' = 'mongodb',
8 'hosts' = '<host>:27017',
9 'username' = '<username>',
10 'password' = '${secret_values.password}',
11 'database' = 'myDatabase',
12 'collection' = 'products'
13);
14
15SELECT * FROM mongo_source;CDC Source to Print Sink
1CREATE TEMPORARY TABLE mongo_cdc_source (
2 `_id` STRING,
3 name STRING,
4 weight DECIMAL,
5 PRIMARY KEY (`_id`) NOT ENFORCED
6) WITH (
7 'connector' = 'mongodb-cdc',
8 'hosts' = '<host>:27017',
9 'username' = '<username>',
10 'password' = '${secret_values.password}',
11 'database' = 'myDatabase',
12 'collection' = 'products'
13);
14
15CREATE TEMPORARY TABLE products_sink (
16 `_id` STRING,
17 name STRING,
18 weight DECIMAL
19) WITH (
20 'connector' = 'print',
21 'logger' = 'true'
22);
23
24INSERT INTO products_sink
25SELECT _id, name, weight
26FROM mongo_cdc_source;Datagen to MongoDB Result Table
1CREATE TEMPORARY TABLE datagen_source (
2 `_id` STRING,
3 name STRING,
4 weight DECIMAL
5) WITH (
6 'connector' = 'datagen'
7);
8
9CREATE TEMPORARY TABLE mongo_sink (
10 `_id` STRING,
11 name STRING,
12 weight DECIMAL,
13 PRIMARY KEY (`_id`) NOT ENFORCED
14) WITH (
15 'connector' = 'mongodb',
16 'hosts' = '<host>:27017',
17 'username' = '<username>',
18 'password' = '${secret_values.password}',
19 'database' = 'myDatabase',
20 'collection' = 'products',
21 'sink.buffer-flush.max-rows' = '500',
22 'sink.buffer-flush.interval' = '2s',
23 'sink.delivery-guarantee' = 'at-least-once',
24 'sink.max-retries' = '5'
25);
26
27INSERT INTO mongo_sink SELECT * FROM datagen_source;Lookup Join
1CREATE TEMPORARY TABLE datagen_source (
2 id STRING,
3 a INT,
4 b BIGINT,
5 `proctime` AS PROCTIME()
6) WITH (
7 'connector' = 'datagen'
8);
9
10CREATE TEMPORARY TABLE mongo_dim (
11 `_id` STRING,
12 name STRING,
13 weight DECIMAL,
14 PRIMARY KEY (`_id`) NOT ENFORCED
15) WITH (
16 'connector' = 'mongodb',
17 'hosts' = '<host>:27017',
18 'username' = '<username>',
19 'password' = '${secret_values.password}',
20 'database' = 'myDatabase',
21 'collection' = 'products'
22);
23
24CREATE TEMPORARY TABLE enriched_sink (
25 id STRING,
26 a INT,
27 b BIGINT,
28 name STRING
29) WITH (
30 'connector' = 'print',
31 'logger' = 'true'
32);
33
34INSERT INTO enriched_sink
35SELECT T.id, T.a, T.b, H.name
36FROM datagen_source AS T
37JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H
38ON T.id = H._id;Full Changelog CDC Source to Print Sink
1CREATE TEMPORARY TABLE mongo_full_cdc (
2 `_id` STRING,
3 name STRING,
4 weight DECIMAL,
5 PRIMARY KEY (`_id`) NOT ENFORCED
6) WITH (
7 'connector' = 'mongodb-cdc',
8 'hosts' = '<host>:27017',
9 'username' = '<username>',
10 'password' = '${secret_values.password}',
11 'database' = 'myDatabase',
12 'collection' = 'products',
13 'scan.full-changelog' = 'true'
14);
15
16CREATE TEMPORARY TABLE products_sink (
17 `_id` STRING,
18 name STRING,
19 weight DECIMAL
20) WITH (
21 'connector' = 'print',
22 'logger' = 'true'
23);
24
25INSERT INTO products_sink
26SELECT _id, name, weight
27FROM mongo_full_cdc;DataStream API
CDC Source (MongoDBSource)
Build a MongoDBSource for CDC streaming:
1MongoDBSource.builder()
2 .hosts("<host>:27017")
3 .username("<username>")
4 .password("<password>")
5 .databaseList("myDatabase")
6 .collectionList("products")
7 .startupOptions(StartupOptions.initial())
8 .deserializer(new JsonDebeziumDeserializationSchema())
9 .build();To enable the incremental snapshot feature, use MongoDBSource#builder() from the com.ververica.cdc.connectors.mongodb.source package. Without incremental snapshots, use MongoDBSource#builder() from com.ververica.cdc.connectors.mongodb.
MongoDBSource Parameters
Bounded Source (MongoSource)
Build a MongoSource for bounded batch reads:
1MongoSource.<RowData>builder()
2 .setUri("mongodb://<username>:<password>@<host>:27017")
3 .setDatabase("myDatabase")
4 .setCollection("products")
5 .setDeserializationSchema(new MongoDeserializationSchema<RowData>() {
6 @Override
7 public RowData deserialize(BsonDocument document) {
8 // Convert BsonDocument to RowData
9 }
10 @Override
11 public TypeInformation<RowData> getProducedType() {
12 return Types.ROW_NAMED(...);
13 }
14 })
15 .build();Sink (MongoSink)
Build a MongoSink for writing data with the DataStream API:
1MongoSink.<RowData>builder()
2 .setUri("mongodb://<username>:<password>@<host>:27017")
3 .setDatabase("myDatabase")
4 .setCollection("products")
5 .setBatchSize(1000)
6 .setBatchIntervalMs(1000)
7 .setMaxRetries(3)
8 .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
9 .setSerializationSchema(
10 new MongoSerializationSchema<RowData>() {
11 @Override
12 public BsonDocument serialize(RowData element) {
13 // Convert RowData to BsonDocument
14 }
15 })
16 .build();