Docs Home
Viewing docs for
BYOCSelf-Managed

MongoDB

On this page

The MongoDB connector enables reading from, writing to, and joining MongoDB collections directly from Flink SQL. It supports batch snapshots, CDC streaming, dimension (lookup join) tables, and result (sink) tables. The connector requires MongoDB 4.0 or later.

Features

ItemDescription
Table typesSource table (batch snapshot or CDC streaming), dimension table (lookup join), and result table (sink)
Running modeStreaming mode
Metrics (source / CDC)numBytesIn, numBytesInPerSecond, numRecordsIn, numRecordsInPerSecond, numRecordsInErrors, currentFetchEventTimeLag, currentEmitEventTimeLag, watermarkLag, sourceIdleTime
Metrics (dimension / result)None
API typesDataStream API and SQL API
Data update / deletion in result tableSupported (requires primary key)

Source Table Types

The MongoDB connector supports two distinct source table modes:

Modeconnector ValueBehavior
Regular (batch) sourcemongodbReads a bounded snapshot of a collection and finishes. No change tracking.
CDC (streaming) sourcemongodb-cdcReads a full snapshot, then continuously captures changes through the Change Stream API. Produces an unbounded changelog stream.

Use mongodb for dimension tables and result tables.

Full Changelog Mode

Full changelog mode applies to CDC sources (mongodb-cdc) only and requires MongoDB 6.0 or later with pre-images and post-images enabled on the collection.

By default, the MongoDB CDC connector operates in upsert mode: Change Stream events for updates contain only the post-update document state. The connector cannot emit UPDATE_BEFORE rows, so downstream operators that need the previous row value must maintain state internally.

When pre-images and post-images are enabled on a collection and scan.full-changelog = true is set, the connector emits a complete changelog stream containing all four Flink row kinds: INSERT, UPDATE_BEFORE, UPDATE_AFTER, and DELETE.

Prerequisites

  1. MongoDB 6.0 or later.
  2. Pre-images and post-images enabled on the collection. Enable them on an existing collection:
JAVASCRIPT
1db.runCommand({
2  collMod: "products",
3  changeStreamPreAndPostImages: { enabled: true }
4})

Or specify the option when creating a new collection:

JAVASCRIPT
1db.createCollection("products", {
2  changeStreamPreAndPostImages: { enabled: true }
3})
  1. A replica set or sharded cluster. The Change Stream API does not support standalone instances.

Configuration

Set scan.full-changelog to true in the WITH clause:

SQL
1CREATE TEMPORARY TABLE mongo_full_cdc (
2  `_id`   STRING,
3  name    STRING,
4  weight  DECIMAL,
5  PRIMARY KEY (`_id`) NOT ENFORCED
6) WITH (
7  'connector'           = 'mongodb-cdc',
8  'hosts'               = '<host>:27017',
9  'username'            = '<username>',
10  'password'            = '${secret_values.password}',
11  'database'            = 'myDatabase',
12  'collection'          = 'products',
13  'scan.full-changelog' = 'true'
14);

How It Works

When scan.full-changelog is enabled:

  • Insert events: the connector emits an INSERT row with the new document.
  • Update and replace events: the connector emits an UPDATE_BEFORE row containing the pre-image (document state before the change), followed by an UPDATE_AFTER row containing the post-image (document state after the change).
  • Delete events: the connector emits a DELETE row containing the pre-image of the deleted document.

This eliminates the need for Flink operators to maintain state for deriving old values, resulting in lower state usage and more accurate retraction-based computations.

When to Use Full Changelog Mode

  • Aggregations with retractions: GROUP BY queries, windowed aggregations, and other operators that retract old values benefit from receiving explicit UPDATE_BEFORE rows.
  • Changelog-based sinks: sinks that require before-images for updates or deletes.
  • Reduced state overhead: because MongoDB provides the old row, Flink does not need to keep a copy of the last-seen row in state.

Limitations

  • This feature is experimental.
  • Pre-images and post-images increase storage and I/O on the MongoDB server. Evaluate the performance impact before enabling on high-throughput collections.
  • If the collection does not have pre-images and post-images enabled and scan.full-changelog is true, the connector fails at startup.

Prerequisites

CDC Source

  • The MongoDB CDC connector reads data from self-managed MongoDB databases.
  • The replica set feature must be enabled on the monitored database. For more information, see Replication.
  • If MongoDB authentication is enabled, the user must have the following permissions: splitVector, listDatabases, listCollections, collStats, find, changeStream, and read access to the config.collections and config.chunks collections.
  • Requires a VERA engine compatible with Flink 1.17 or later.

Dimension Table and Result Table

  • The MongoDB database and collection must already exist.
  • Requires a VERA engine compatible with Flink 1.17 or later (dimension tables).

Limitations

  • If no primary key is defined on a result table, only INSERT is supported. Updates and deletes are not supported.
  • Exactly-once delivery is not supported for result tables. At-least-once is the maximum.
  • The MongoDB CDC connector and dimension tables require a VERA engine compatible with Flink 1.17 or later.
  • Setting scan.startup.mode = timestamp requires MongoDB 4.0 or later.
  • Full changelog mode requires MongoDB 6.0 or later.

Syntax

SQL
1CREATE TABLE tableName (
2  _id STRING,
3  -- additional columns
4  PRIMARY KEY (_id) NOT ENFORCED
5) WITH (
6  'connector'  = 'mongodb',
7  'hosts'      = '<host>:27017',
8  'username'   = '<username>',
9  'password'   = '${secret_values.password}',
10  'database'   = '<database>',
11  'collection' = '<collection>'
12);

Options

Common Options

OptionTypeRequiredDefaultDescription
connectorStringYes(none)mongodb for all table types. Use mongodb-cdc as an explicit connector value for CDC source tables.
uriStringNo(none)Full MongoDB connection URI. If set, overrides scheme, hosts, username, password, and connection.options.
hostsStringNo(none)Hostname of the MongoDB instance. Separate multiple hosts with commas. Either uri or hosts must be set.
schemeStringNomongodbConnection protocol. Valid values: mongodb, mongodb+srv.
usernameStringNo(none)Required if MongoDB authentication is enabled.
passwordStringNo(none)Required if MongoDB authentication is enabled.
databaseStringNo(none)MongoDB database name. Supports regular expressions for CDC sources. If omitted, all databases are monitored.
collectionStringNo(none)MongoDB collection name. Supports regular expressions. For collection names that contain regex special characters, use a fully qualified database.collection name.
connection.optionsStringNo(none)Additional connection options as key=value pairs separated by &, for example connectTimeoutMS=12000&socketTimeoutMS=13000.

CDC Source Options

OptionTypeRequiredDefaultDescription
scan.startup.modeStringNoinitialStartup mode. initial reads a full snapshot then streams changes. latest-offset streams changes from the current offset. timestamp streams changes from a specific point in time.
scan.startup.timestamp-millisLongRequired when scan.startup.mode = timestamp(none)UNIX epoch milliseconds for the timestamp startup mode.
initial.snapshotting.queue.sizeIntegerNo10240Maximum queue size during the initial snapshot phase.
batch.sizeIntegerNo1024Cursor batch processing size.
poll.max.batch.sizeIntegerNo1024Maximum change documents processed per batch during streaming.
poll.await.time.msIntegerNo1000Polling interval in milliseconds.
heartbeat.interval.msIntegerNo0Heartbeat interval in milliseconds. A value of 0 disables heartbeats. Configure this for infrequently updated collections.
scan.incremental.snapshot.enabledBooleanNofalseEnables parallel reading during the initial snapshot phase. This is an experimental feature.
scan.incremental.snapshot.chunk.size.mbIntegerNo64Shard size in MB when parallel snapshot reading is enabled.
scan.full-changelogBooleanNofalseEnables full changelog event streams. Requires MongoDB 6.0 or later with changeStreamPreAndPostImages enabled on the collection. This is an experimental feature.
scan.flatten-nested-columns.enabledBooleanNofalseReads nested BSON fields as dot-separated names, for example nested.col. Requires a VERA engine compatible with Flink 1.17 or later.
scan.primitive-as-stringBooleanNofalseInfers all primitive BSON types as STRING. Requires a VERA engine compatible with Flink 1.17 or later.

Dimension Table Options

OptionTypeRequiredDefaultDescription
lookup.cacheStringNoNONECache policy. NONE disables caching. PARTIAL caches looked-up records.
lookup.max-retriesIntegerNo3Maximum retries on lookup failure.
lookup.retry.intervalDurationNo1sRetry interval on lookup failure.
lookup.partial-cache.expire-after-accessDurationNo(none)Cache TTL after last access. Requires lookup.cache = PARTIAL.
lookup.partial-cache.expire-after-writeDurationNo(none)Cache TTL after write. Requires lookup.cache = PARTIAL.
lookup.partial-cache.max-rowsLongNo(none)Maximum number of cached rows. Requires lookup.cache = PARTIAL.
lookup.partial-cache.cache-missing-keyBooleanNotrueCaches empty results for missing keys. Requires lookup.cache = PARTIAL.

Result Table Options

OptionTypeRequiredDefaultDescription
sink.buffer-flush.max-rowsIntegerNo1000Maximum records written per flush.
sink.buffer-flush.intervalDurationNo1sFlush interval.
sink.delivery-guaranteeStringNoat-least-onceWrite semantics. Valid values: none, at-least-once. Exactly-once is not supported.
sink.max-retriesIntegerNo3Maximum retries on write failure.
sink.retry.intervalDurationNo1sRetry interval on write failure.
sink.parallelismIntegerNo(none)Sink parallelism.

Data Type Mappings

CDC Source Table

BSON Data TypeFlink SQL Data Type
Int32INT
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL(p, s)
BooleanBOOLEAN
Date / TimestampDATE
Date / TimestampTIME
DateTimeTIMESTAMP(3), TIMESTAMP_LTZ(3)
TimestampTIMESTAMP(0), TIMESTAMP_LTZ(0)
String, ObjectId, UUID, Symbol, MD5, JavaScript, RegexSTRING
BinaryBYTES
ObjectROW
ArrayARRAY
DBPointerROW<$ref STRING, $id STRING>
GeoJSON PointROW<type STRING, coordinates ARRAY<DOUBLE>>
GeoJSON LineROW<type STRING, coordinates ARRAY<ARRAY<DOUBLE>>>

Dimension Table and Result Table

BSON Data TypeFlink SQL Data Type
Int32INT
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL
BooleanBOOLEAN
DateTimeTIMESTAMP_LTZ(3)
TimestampTIMESTAMP_LTZ(0)
StringSTRING
BinaryBYTES
ObjectROW
ArrayARRAY

SQL Examples

Regular Source to Print Sink

SQL
1CREATE TEMPORARY TABLE mongo_source (
2  `_id`   STRING,
3  name    STRING,
4  weight  DECIMAL,
5  PRIMARY KEY (`_id`) NOT ENFORCED
6) WITH (
7  'connector'  = 'mongodb',
8  'hosts'      = '<host>:27017',
9  'username'   = '<username>',
10  'password'   = '${secret_values.password}',
11  'database'   = 'myDatabase',
12  'collection' = 'products'
13);
14
15SELECT * FROM mongo_source;

CDC Source to Print Sink

SQL
1CREATE TEMPORARY TABLE mongo_cdc_source (
2  `_id`   STRING,
3  name    STRING,
4  weight  DECIMAL,
5  PRIMARY KEY (`_id`) NOT ENFORCED
6) WITH (
7  'connector'  = 'mongodb-cdc',
8  'hosts'      = '<host>:27017',
9  'username'   = '<username>',
10  'password'   = '${secret_values.password}',
11  'database'   = 'myDatabase',
12  'collection' = 'products'
13);
14
15CREATE TEMPORARY TABLE products_sink (
16  `_id`   STRING,
17  name    STRING,
18  weight  DECIMAL
19) WITH (
20  'connector' = 'print',
21  'logger'    = 'true'
22);
23
24INSERT INTO products_sink
25SELECT _id, name, weight
26FROM mongo_cdc_source;

Datagen to MongoDB Result Table

SQL
1CREATE TEMPORARY TABLE datagen_source (
2  `_id`   STRING,
3  name    STRING,
4  weight  DECIMAL
5) WITH (
6  'connector' = 'datagen'
7);
8
9CREATE TEMPORARY TABLE mongo_sink (
10  `_id`   STRING,
11  name    STRING,
12  weight  DECIMAL,
13  PRIMARY KEY (`_id`) NOT ENFORCED
14) WITH (
15  'connector'                  = 'mongodb',
16  'hosts'                      = '<host>:27017',
17  'username'                   = '<username>',
18  'password'                   = '${secret_values.password}',
19  'database'                   = 'myDatabase',
20  'collection'                 = 'products',
21  'sink.buffer-flush.max-rows' = '500',
22  'sink.buffer-flush.interval' = '2s',
23  'sink.delivery-guarantee'    = 'at-least-once',
24  'sink.max-retries'           = '5'
25);
26
27INSERT INTO mongo_sink SELECT * FROM datagen_source;

Lookup Join

SQL
1CREATE TEMPORARY TABLE datagen_source (
2  id         STRING,
3  a          INT,
4  b          BIGINT,
5  `proctime` AS PROCTIME()
6) WITH (
7  'connector' = 'datagen'
8);
9
10CREATE TEMPORARY TABLE mongo_dim (
11  `_id`   STRING,
12  name    STRING,
13  weight  DECIMAL,
14  PRIMARY KEY (`_id`) NOT ENFORCED
15) WITH (
16  'connector'  = 'mongodb',
17  'hosts'      = '<host>:27017',
18  'username'   = '<username>',
19  'password'   = '${secret_values.password}',
20  'database'   = 'myDatabase',
21  'collection' = 'products'
22);
23
24CREATE TEMPORARY TABLE enriched_sink (
25  id     STRING,
26  a      INT,
27  b      BIGINT,
28  name   STRING
29) WITH (
30  'connector' = 'print',
31  'logger'    = 'true'
32);
33
34INSERT INTO enriched_sink
35SELECT T.id, T.a, T.b, H.name
36FROM datagen_source AS T
37JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H
38ON T.id = H._id;

Full Changelog CDC Source to Print Sink

SQL
1CREATE TEMPORARY TABLE mongo_full_cdc (
2  `_id`   STRING,
3  name    STRING,
4  weight  DECIMAL,
5  PRIMARY KEY (`_id`) NOT ENFORCED
6) WITH (
7  'connector'           = 'mongodb-cdc',
8  'hosts'               = '<host>:27017',
9  'username'            = '<username>',
10  'password'            = '${secret_values.password}',
11  'database'            = 'myDatabase',
12  'collection'          = 'products',
13  'scan.full-changelog' = 'true'
14);
15
16CREATE TEMPORARY TABLE products_sink (
17  `_id`   STRING,
18  name    STRING,
19  weight  DECIMAL
20) WITH (
21  'connector' = 'print',
22  'logger'    = 'true'
23);
24
25INSERT INTO products_sink
26SELECT _id, name, weight
27FROM mongo_full_cdc;

DataStream API

CDC Source (MongoDBSource)

Build a MongoDBSource for CDC streaming:

JAVA
1MongoDBSource.builder()
2  .hosts("<host>:27017")
3  .username("<username>")
4  .password("<password>")
5  .databaseList("myDatabase")
6  .collectionList("products")
7  .startupOptions(StartupOptions.initial())
8  .deserializer(new JsonDebeziumDeserializationSchema())
9  .build();

MongoDBSource Parameters

ParameterDescription
hostsHostname of the MongoDB database.
usernameMongoDB username. Not required if authentication is disabled.
passwordMongoDB password. Not required if authentication is disabled.
databaseListDatabase names to monitor. Supports regex. Use .* to match all databases.
collectionListCollection names to monitor. Supports regex.
startupOptionsStartupOptions.initial(), StartupOptions.latestOffset(), or StartupOptions.timestamp(millis).
deserializerMongoDBConnectorDeserializationSchema (upsert to RowData), MongoDBConnectorFullChangelogDeserializationSchema (full changelog to RowData), or JsonDebeziumDeserializationSchema (JSON string).

Bounded Source (MongoSource)

Build a MongoSource for bounded batch reads:

JAVA
1MongoSource.<RowData>builder()
2  .setUri("mongodb://<username>:<password>@<host>:27017")
3  .setDatabase("myDatabase")
4  .setCollection("products")
5  .setDeserializationSchema(new MongoDeserializationSchema<RowData>() {
6      @Override
7      public RowData deserialize(BsonDocument document) {
8          // Convert BsonDocument to RowData
9      }
10      @Override
11      public TypeInformation<RowData> getProducedType() {
12          return Types.ROW_NAMED(...);
13      }
14  })
15  .build();

Sink (MongoSink)

Build a MongoSink for writing data with the DataStream API:

JAVA
1MongoSink.<RowData>builder()
2  .setUri("mongodb://<username>:<password>@<host>:27017")
3  .setDatabase("myDatabase")
4  .setCollection("products")
5  .setBatchSize(1000)
6  .setBatchIntervalMs(1000)
7  .setMaxRetries(3)
8  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
9  .setSerializationSchema(
10      new MongoSerializationSchema<RowData>() {
11          @Override
12          public BsonDocument serialize(RowData element) {
13              // Convert RowData to BsonDocument
14          }
15      })
16  .build();

MongoSink Parameters

ParameterDescription
setUriFull MongoDB connection URI.
setDatabaseTarget database name.
setCollectionTarget collection name.
setBatchSizeMaximum number of documents per write batch.
setBatchIntervalMsFlush interval in milliseconds.
setMaxRetriesMaximum retries on write failure.
setDeliveryGuaranteeDeliveryGuarantee.AT_LEAST_ONCE or DeliveryGuarantee.NONE.
setSerializationSchemaA MongoSerializationSchema implementation that converts records to BsonDocument.
Was this helpful?