Skip to main content

MongoDB

The MongoDB connector enables reading from, writing to, and joining MongoDB collections directly from Flink SQL. It supports batch snapshots, CDC streaming, dimension (lookup join) tables, and result (sink) tables. The connector requires MongoDB 4.0 or later.

Features

ItemDescription
Table typesSource table (batch snapshot or CDC streaming), dimension table (lookup join), and result table (sink)
Running modeStreaming mode
Metrics (source / CDC)numBytesIn, numBytesInPerSecond, numRecordsIn, numRecordsInPerSecond, numRecordsInErrors, currentFetchEventTimeLag, currentEmitEventTimeLag, watermarkLag, sourceIdleTime
Metrics (dimension / result)None
API typesDataStream API and SQL API
Data update / deletion in result tableSupported (requires primary key)

Source Table Types

The MongoDB connector supports two distinct source table modes:

Modeconnector ValueBehavior
Regular (batch) sourcemongodbReads a bounded snapshot of a collection and finishes. No change tracking.
CDC (streaming) sourcemongodb-cdcReads a full snapshot, then continuously captures changes through the Change Stream API. Produces an unbounded changelog stream.

Use mongodb for dimension tables and result tables.

Full Changelog Mode

Full changelog mode applies to CDC sources (mongodb-cdc) only and requires MongoDB 6.0 or later with pre-images and post-images enabled on the collection.

By default, the MongoDB CDC connector operates in upsert mode: Change Stream events for updates contain only the post-update document state. The connector cannot emit UPDATE_BEFORE rows, so downstream operators that need the previous row value must maintain state internally.

When pre-images and post-images are enabled on a collection and scan.full-changelog = true is set, the connector emits a complete changelog stream containing all four Flink row kinds: INSERT, UPDATE_BEFORE, UPDATE_AFTER, and DELETE.

Prerequisites

  1. MongoDB 6.0 or later.
  2. Pre-images and post-images enabled on the collection. Enable them on an existing collection:
db.runCommand({
collMod: "products",
changeStreamPreAndPostImages: { enabled: true }
})

Or specify the option when creating a new collection:

db.createCollection("products", {
changeStreamPreAndPostImages: { enabled: true }
})
  1. A replica set or sharded cluster. The Change Stream API does not support standalone instances.

Configuration

Set scan.full-changelog to true in the WITH clause:

CREATE TEMPORARY TABLE mongo_full_cdc (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products',
'scan.full-changelog' = 'true'
);

How It Works

When scan.full-changelog is enabled:

  • Insert events: the connector emits an INSERT row with the new document.
  • Update and replace events: the connector emits an UPDATE_BEFORE row containing the pre-image (document state before the change), followed by an UPDATE_AFTER row containing the post-image (document state after the change).
  • Delete events: the connector emits a DELETE row containing the pre-image of the deleted document.

This eliminates the need for Flink operators to maintain state for deriving old values, resulting in lower state usage and more accurate retraction-based computations.

When to Use Full Changelog Mode

  • Aggregations with retractions: GROUP BY queries, windowed aggregations, and other operators that retract old values benefit from receiving explicit UPDATE_BEFORE rows.
  • Changelog-based sinks: sinks that require before-images for updates or deletes.
  • Reduced state overhead: because MongoDB provides the old row, Flink does not need to keep a copy of the last-seen row in state.

Limitations

  • This feature is experimental.
  • Pre-images and post-images increase storage and I/O on the MongoDB server. Evaluate the performance impact before enabling on high-throughput collections.
  • If the collection does not have pre-images and post-images enabled and scan.full-changelog is true, the connector fails at startup.

Prerequisites

CDC Source

  • The MongoDB CDC connector reads data from self-managed MongoDB databases.
  • The replica set feature must be enabled on the monitored database. For more information, see Replication.
  • If MongoDB authentication is enabled, the user must have the following permissions: splitVector, listDatabases, listCollections, collStats, find, changeStream, and read access to the config.collections and config.chunks collections.
  • Requires a VERA engine compatible with Flink 1.17 or later.

Dimension Table and Result Table

  • The MongoDB database and collection must already exist.
  • Requires a VERA engine compatible with Flink 1.17 or later (dimension tables).

Limitations

  • If no primary key is defined on a result table, only INSERT is supported. Updates and deletes are not supported.
  • Exactly-once delivery is not supported for result tables. At-least-once is the maximum.
  • The MongoDB CDC connector and dimension tables require a VERA engine compatible with Flink 1.17 or later.
  • Setting scan.startup.mode = timestamp requires MongoDB 4.0 or later.
  • Full changelog mode requires MongoDB 6.0 or later.

Syntax

CREATE TABLE tableName (
_id STRING,
-- additional columns
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = '<database>',
'collection' = '<collection>'
);
note

When you create a MongoDB CDC source table, you must declare the _id STRING column and specify it as the primary key.

Options

Common Options

OptionTypeRequiredDefaultDescription
connectorStringYes(none)mongodb for all table types. Use mongodb-cdc as an explicit connector value for CDC source tables.
uriStringNo(none)Full MongoDB connection URI. If set, overrides scheme, hosts, username, password, and connection.options.
hostsStringNo(none)Hostname of the MongoDB instance. Separate multiple hosts with commas. Either uri or hosts must be set.
schemeStringNomongodbConnection protocol. Valid values: mongodb, mongodb+srv.
usernameStringNo(none)Required if MongoDB authentication is enabled.
passwordStringNo(none)Required if MongoDB authentication is enabled.
databaseStringNo(none)MongoDB database name. Supports regular expressions for CDC sources. If omitted, all databases are monitored.
collectionStringNo(none)MongoDB collection name. Supports regular expressions. For collection names that contain regex special characters, use a fully qualified database.collection name.
connection.optionsStringNo(none)Additional connection options as key=value pairs separated by &, for example connectTimeoutMS=12000&socketTimeoutMS=13000.
note

To keep credentials out of SQL statements, use secret references: 'password' = '${secret_values.password}'. For more information, see Secret Values.

CDC Source Options

OptionTypeRequiredDefaultDescription
scan.startup.modeStringNoinitialStartup mode. initial reads a full snapshot then streams changes. latest-offset streams changes from the current offset. timestamp streams changes from a specific point in time.
scan.startup.timestamp-millisLongRequired when scan.startup.mode = timestamp(none)UNIX epoch milliseconds for the timestamp startup mode.
initial.snapshotting.queue.sizeIntegerNo10240Maximum queue size during the initial snapshot phase.
batch.sizeIntegerNo1024Cursor batch processing size.
poll.max.batch.sizeIntegerNo1024Maximum change documents processed per batch during streaming.
poll.await.time.msIntegerNo1000Polling interval in milliseconds.
heartbeat.interval.msIntegerNo0Heartbeat interval in milliseconds. A value of 0 disables heartbeats. Configure this for infrequently updated collections.
scan.incremental.snapshot.enabledBooleanNofalseEnables parallel reading during the initial snapshot phase. This is an experimental feature.
scan.incremental.snapshot.chunk.size.mbIntegerNo64Shard size in MB when parallel snapshot reading is enabled.
scan.full-changelogBooleanNofalseEnables full changelog event streams. Requires MongoDB 6.0 or later with changeStreamPreAndPostImages enabled on the collection. This is an experimental feature.
scan.flatten-nested-columns.enabledBooleanNofalseReads nested BSON fields as dot-separated names, for example nested.col. Requires a VERA engine compatible with Flink 1.17 or later.
scan.primitive-as-stringBooleanNofalseInfers all primitive BSON types as STRING. Requires a VERA engine compatible with Flink 1.17 or later.

Dimension Table Options

OptionTypeRequiredDefaultDescription
lookup.cacheStringNoNONECache policy. NONE disables caching. PARTIAL caches looked-up records.
lookup.max-retriesIntegerNo3Maximum retries on lookup failure.
lookup.retry.intervalDurationNo1sRetry interval on lookup failure.
lookup.partial-cache.expire-after-accessDurationNo(none)Cache TTL after last access. Requires lookup.cache = PARTIAL.
lookup.partial-cache.expire-after-writeDurationNo(none)Cache TTL after write. Requires lookup.cache = PARTIAL.
lookup.partial-cache.max-rowsLongNo(none)Maximum number of cached rows. Requires lookup.cache = PARTIAL.
lookup.partial-cache.cache-missing-keyBooleanNotrueCaches empty results for missing keys. Requires lookup.cache = PARTIAL.

Result Table Options

OptionTypeRequiredDefaultDescription
sink.buffer-flush.max-rowsIntegerNo1000Maximum records written per flush.
sink.buffer-flush.intervalDurationNo1sFlush interval.
sink.delivery-guaranteeStringNoat-least-onceWrite semantics. Valid values: none, at-least-once. Exactly-once is not supported.
sink.max-retriesIntegerNo3Maximum retries on write failure.
sink.retry.intervalDurationNo1sRetry interval on write failure.
sink.parallelismIntegerNo(none)Sink parallelism.

Data Type Mappings

CDC Source Table

BSON Data TypeFlink SQL Data Type
Int32INT
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL(p, s)
BooleanBOOLEAN
Date / TimestampDATE
Date / TimestampTIME
DateTimeTIMESTAMP(3), TIMESTAMP_LTZ(3)
TimestampTIMESTAMP(0), TIMESTAMP_LTZ(0)
String, ObjectId, UUID, Symbol, MD5, JavaScript, RegexSTRING
BinaryBYTES
ObjectROW
ArrayARRAY
DBPointerROW<$ref STRING, $id STRING>
GeoJSON PointROW<type STRING, coordinates ARRAY<DOUBLE>>
GeoJSON LineROW<type STRING, coordinates ARRAY<ARRAY<DOUBLE>>>

Dimension Table and Result Table

BSON Data TypeFlink SQL Data Type
Int32INT
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL
BooleanBOOLEAN
DateTimeTIMESTAMP_LTZ(3)
TimestampTIMESTAMP_LTZ(0)
StringSTRING
BinaryBYTES
ObjectROW
ArrayARRAY

SQL Examples

Regular Source to Print Sink

CREATE TEMPORARY TABLE mongo_source (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products'
);

SELECT * FROM mongo_source;

CDC Source to Print Sink

CREATE TEMPORARY TABLE mongo_cdc_source (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products'
);

CREATE TEMPORARY TABLE products_sink (
`_id` STRING,
name STRING,
weight DECIMAL
) WITH (
'connector' = 'print',
'logger' = 'true'
);

INSERT INTO products_sink
SELECT _id, name, weight
FROM mongo_cdc_source;

Datagen to MongoDB Result Table

CREATE TEMPORARY TABLE datagen_source (
`_id` STRING,
name STRING,
weight DECIMAL
) WITH (
'connector' = 'datagen'
);

CREATE TEMPORARY TABLE mongo_sink (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products',
'sink.buffer-flush.max-rows' = '500',
'sink.buffer-flush.interval' = '2s',
'sink.delivery-guarantee' = 'at-least-once',
'sink.max-retries' = '5'
);

INSERT INTO mongo_sink SELECT * FROM datagen_source;

Lookup Join

CREATE TEMPORARY TABLE datagen_source (
id STRING,
a INT,
b BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);

CREATE TEMPORARY TABLE mongo_dim (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products'
);

CREATE TEMPORARY TABLE enriched_sink (
id STRING,
a INT,
b BIGINT,
name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);

INSERT INTO enriched_sink
SELECT T.id, T.a, T.b, H.name
FROM datagen_source AS T
JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H
ON T.id = H._id;

Full Changelog CDC Source to Print Sink

CREATE TEMPORARY TABLE mongo_full_cdc (
`_id` STRING,
name STRING,
weight DECIMAL,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'database' = 'myDatabase',
'collection' = 'products',
'scan.full-changelog' = 'true'
);

CREATE TEMPORARY TABLE products_sink (
`_id` STRING,
name STRING,
weight DECIMAL
) WITH (
'connector' = 'print',
'logger' = 'true'
);

INSERT INTO products_sink
SELECT _id, name, weight
FROM mongo_full_cdc;

DataStream API

CDC Source (MongoDBSource)

Build a MongoDBSource for CDC streaming:

MongoDBSource.builder()
.hosts("<host>:27017")
.username("<username>")
.password("<password>")
.databaseList("myDatabase")
.collectionList("products")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
note

To enable the incremental snapshot feature, use MongoDBSource#builder() from the com.ververica.cdc.connectors.mongodb.source package. Without incremental snapshots, use MongoDBSource#builder() from com.ververica.cdc.connectors.mongodb.

MongoDBSource Parameters

ParameterDescription
hostsHostname of the MongoDB database.
usernameMongoDB username. Not required if authentication is disabled.
passwordMongoDB password. Not required if authentication is disabled.
databaseListDatabase names to monitor. Supports regex. Use .* to match all databases.
collectionListCollection names to monitor. Supports regex.
startupOptionsStartupOptions.initial(), StartupOptions.latestOffset(), or StartupOptions.timestamp(millis).
deserializerMongoDBConnectorDeserializationSchema (upsert to RowData), MongoDBConnectorFullChangelogDeserializationSchema (full changelog to RowData), or JsonDebeziumDeserializationSchema (JSON string).

Bounded Source (MongoSource)

Build a MongoSource for bounded batch reads:

MongoSource.<RowData>builder()
.setUri("mongodb://<username>:<password>@<host>:27017")
.setDatabase("myDatabase")
.setCollection("products")
.setDeserializationSchema(new MongoDeserializationSchema<RowData>() {
@Override
public RowData deserialize(BsonDocument document) {
// Convert BsonDocument to RowData
}
@Override
public TypeInformation<RowData> getProducedType() {
return Types.ROW_NAMED(...);
}
})
.build();

Sink (MongoSink)

Build a MongoSink for writing data with the DataStream API:

MongoSink.<RowData>builder()
.setUri("mongodb://<username>:<password>@<host>:27017")
.setDatabase("myDatabase")
.setCollection("products")
.setBatchSize(1000)
.setBatchIntervalMs(1000)
.setMaxRetries(3)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setSerializationSchema(
new MongoSerializationSchema<RowData>() {
@Override
public BsonDocument serialize(RowData element) {
// Convert RowData to BsonDocument
}
})
.build();

MongoSink Parameters

ParameterDescription
setUriFull MongoDB connection URI.
setDatabaseTarget database name.
setCollectionTarget collection name.
setBatchSizeMaximum number of documents per write batch.
setBatchIntervalMsFlush interval in milliseconds.
setMaxRetriesMaximum retries on write failure.
setDeliveryGuaranteeDeliveryGuarantee.AT_LEAST_ONCE or DeliveryGuarantee.NONE.
setSerializationSchemaA MongoSerializationSchema implementation that converts records to BsonDocument.