MongoDB & MongoDB CDC
Background Information
MongoDB is a document-oriented unstructured database that simplifies application development and expansion. The following table describes the capabilities supported by the MongoDB connector.
Item | Description |
---|---|
Table type | Source table, dimension table, and result table |
Running mode | Streaming mode |
Metric | Metrics for source tables: numBytesIn , numBytesInPerSecond , numRecordsIn , numRecordsInPerSecond , numRecordsInErrors , currentFetchEventTimeLag , currentEmitEventTimeLag , watermarkLag , sourceIdleTime . Metrics for dimension tables and result tables: none |
API type | DataStream API and SQL API |
Data update or deletion in a result table | Supported |
Features
A MongoDB Change Data Capture (CDC) source table is a streaming source table of MongoDB databases. The MongoDB connector for a MongoDB CDC source table is referred to as a MongoDB CDC connector. The MongoDB CDC connector reads full historical data from a MongoDB database and then reads operations log data. This way, data accuracy is ensured. If an error occurs, the exactly-once semantics can be used to ensure data accuracy. The MongoDB CDC connector can use the Change Stream API to efficiently capture document changes in MongoDB databases and collections, monitor document insertion, modification, replacement, and deletion events, and convert the events into changelog streams that can be processed by Flink. The MongoDB CDC connector provides the following features:
- Efficiently monitors document changes by using the Change Stream API that is supported in MongoDB 3.6.
- Ensures the exactly-once semantics for deployments that fail at any stage.
- Supports full and incremental data monitoring. After the snapshot reading phase is complete, fully managed Flink automatically switches to the incremental reading phase.
- Supports parallel reading in the initial snapshot phase. Only MongoDB 4.0 or later supports this feature.
- Supports the following startup modes:
- initial: If the MongoDB CDC connector starts for the first time, the connector performs an initial snapshot for the monitored database table and continues to read the latest operations log data.
- latest-offset: If the MongoDB CDC connector starts for the first time, the connector does not perform a snapshot for the monitored database table. The connector only reads data from the end of the operations log data. This indicates that the connector can read only data changes after the connector starts.
- timestamp: The MongoDB CDC connector skips the snapshot reading phase and reads the operations log data events from a specific timestamp. Only MongoDB 4.0 or later supports this mode.
- Supports full changelog event streams. Only MongoDB 6.0 or later supports this feature.
Prerequisites
- MongoDB CDC source table
- The MongoDB CDC connector can read data from self-managed MongoDB databases.
- The replica set feature is enabled for the MongoDB database that you want to monitor. This ensures that you can use the basic features of the MongoDB CDC connector. For more information, see Replication.
- The preimage and postimage features are enabled for the MongoDB database if you want to use the full changelog event stream feature. For more information, see Document Preimages.
- If the authentication feature of MongoDB is enabled, you must use a MongoDB user that has the following database permissions:
- splitVector
- listDatabases
- listCollections
- collStats
- find
- changeStream
- Permissions to access the config.collections and config.chunks collections
- MongoDB dimension table and result table
- A MongoDB database and table are created.
- An IP address whitelist is configured to access MongoDB.
Limits
- MongoDB CDC source table
- Only the VERA engine compatible with Flink 1.17 or later supports the MongoDB CDC connector.
- For a MongoDB database whose version is earlier than 4.0, you cannot set the scan.startup.mode parameter to timestamp.
- For a MongoDB database whose version is earlier than 6.0, full changelog event streams cannot be generated.
- MongoDB dimension table
- Only the VERA engine compatible with Flink 1.17 or later supports MongoDB dimension tables.
- MongoDB result table
- If no primary key is defined in the DDL statement that is used to create a result table, data can only be inserted into the result table but cannot be updated in or deleted from the result table.
Syntax
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)
When you create a MongoDB CDC source table, you must declare the _id STRING column as the unique primary key.
Parameters in the WITH Clause
Common Parameters
Parameter | Description | Data type | Required | Default value | Remarks |
---|---|---|---|---|---|
connector | The name of the connector. | STRING | Yes | No default value | If you use the MongoDB connector for a source table, you can set this parameter to mongodb or mongodb-cdc. If you use the MongoDB connector for a dimension table or a result table, you must set this parameter to mongodb. |
uri | The URI that is used to access the MongoDB database. | STRING | No | No default value | Note: You must configure one of the uri and hosts parameters. If you configure the uri parameter, you do not need to configure the scheme , hosts , username , password , or connector.options parameter. If you configure both the uri and hosts parameters, the URI specified by the uri parameter is used to access the MongoDB database. |
hosts | The name of the host where the MongoDB instance resides. | STRING | No | No default value | Note: Separate multiple hostnames with commas (, ). |
scheme | The connection protocol that is used to access the MongoDB database. | STRING | No | mongodb | Note: Valid values: mongodb : The default MongoDB protocol is used to access the MongoDB database. mongodb+srv : The DNS SRV record protocol is used to access the MongoDB database. |
username | The username that is used to access the MongoDB database. | STRING | No | No default value | Note: This parameter is required if the identity verification feature is enabled for the MongoDB database. |
password | The password that is used to access the MongoDB database. | STRING | No | No default value | Note: This parameter is required if the identity verification feature is enabled for the MongoDB database. Important: To prevent password leaks, Ververica recommends that you use the key management method to configure your password. |
database | The name of the MongoDB database. | STRING | No | No default value | Note: If you use the MongoDB connector for a source table, a regular expression can be used to match the name of the MongoDB database. If you do not configure this parameter, all databases are monitored. |
collection | The name of the MongoDB collection. | STRING | No | No default value | Important: If the name of the collection that you want to monitor contains special characters of regular expressions, you must provide a fully qualified namespace (database name.collection name). Otherwise, the changes to the collection cannot be captured. If you do not configure this parameter, all collections are monitored. |
connection.options | The parameters that are configured to access the MongoDB database. | STRING | No | No default value | Note: The parameters are key-value pairs that are in the key=value format and separated by ampersands (& ), such as connectTimeoutMS=12000&socketTimeoutMS=13000. |
Parameters Only for Source Tables
Parameter | Description | Data type | Required | Default value | Remarks |
---|---|---|---|---|---|
scan.startup.mode | The startup mode of the MongoDB CDC connector. | STRING | No | initial | Valid values: initial: All data is pulled from the initial offset. - latest-offset: Change data is pulled from the current offset. timestamp: Change data is pulled from a specific timestamp. For more information, see Startup Properties. |
scan.startup.timestamp-millis | The start timestamp for the consumption at the specified offset. | LONG | Depending on the value of the scan.startup.mode parameter: initial: not required latest-offset: not required timestamp: required | No default value | The value of this parameter is the number of milliseconds that have elapsed since 00:00:00 UTC on January 1, 1970. The timestamp follows the UNIX time format. This parameter is required only when the scan.startup.mode parameter is set to timestamp . |
initial.snapshotting.queue.size | The maximum queue size for the initial snapshot phase. | INTEGER | No | 10240 | Note: This parameter takes effect only when the scan.startup.mode parameter is set to initial . |
batch.size | The batch processing size of the cursor. | INTEGER | No | 1024 | N/A. |
poll.max.batch.size | The maximum number of change documents that can be processed in a batch. | INTEGER | No | 1024 | This parameter determines the maximum number of change documents that can be pulled at the same time during stream processing. A large value of this parameter indicates a large buffer that is allocated in the connector. |
poll.await.time.ms | The interval at which data is pulled. | INTEGER | No | 1000 | Unit: milliseconds. |
heartbeat.interval.ms | The interval at which heartbeat packets are sent. | INTEGER | No | 0 | Unit: milliseconds. The MongoDB CDC connector periodically sends heartbeat packets to the MongoDB database to ensure the latest backtracking status. If you set this parameter to 0, heartbeat packets are never sent. Important: Ververica strongly recommends that you configure this parameter for collections that are not frequently updated. |
scan.incremental.snapshot.enabled | Specifies whether to enable the parallel reading mode in the initial snapshot phase. | BOOLEAN | No | false | This is an experimental feature. |
scan.incremental.snapshot.chunk.size.mb | The size of the shard when the parallel snapshot reading mode is enabled. | INTEGER | No | 64 | Unit: MB. This parameter takes effect only when the parallel snapshot reading mode is enabled. |
scan.full-changelog | Specifies whether to generate a complete full changelog event stream. | BOOLEAN | No | false | This is an experimental feature. This parameter is supported only in MongoDB 6.0 or later in which the preimage or postimage feature is enabled. |
scan.flatten-nested-columns.enabled | Specifies whether to read a nested field in a Binary JSON (BSON)-formatted document as a field whose name is separated with a period (. ). | BOOLEAN | No | false | If you set this parameter to true, the col field in the following BSON-formatted document is named nested.col in the obtained schema. {"nested":{"col":true}} Note: Only the VERA engine compatible with Flink 1.17 or later supports this parameter. |
scan.primitive-as-string | Specifies whether to infer all basic data types in BSON-formatted documents as the STRING type. | BOOLEAN | No | false | Note: Only the VERA engine compatible with Flink 1.17 or later supports this parameter. |
Parameters Only for Dimension Tables
Parameter | Description | Data type | Required | Default value | Remarks |
---|---|---|---|---|---|
lookup.cache | The cache policy. | STRING | No | NONE | Valid values: - None: No data is cached. - Partial: Specific data that is looked up in an external database is cached. |
lookup.max-retries | The maximum number of retries when the database fails to be queried. | INTEGER | No | 3 | N/A. |
lookup.retry.interval | The interval between retries when the database fails to be queried. | DURATION | No | 1s | N/A. |
lookup.partial-cache.expire-after-access | The maximum period of time for which data records in the cache can be retained. | DURATION | No | No default value | Unit: ms, s, min, h, or d. If you use this parameter, you must set the lookup.cache parameter to PARTIAL . |
lookup.partial-cache.expire-after-write | The maximum period of time for which data records can be retained after the data records are written to the cache. | DURATION | No | No default value | If you use this parameter, you must set the lookup.cache parameter to PARTIAL . |
lookup.partial-cache.max-rows | The maximum number of data records that can be cached. If the number of data records that are cached exceeds the value of this parameter, the earliest data records expire. | LONG | No | No default value | If you use this parameter, you must set the lookup.cache parameter to PARTIAL . |
lookup.partial-cache.cache-missing-key | Specifies whether to cache empty data records if no data is associated with the physical table. | BOOLEAN | No | True | If you use this parameter, you must set the lookup.cache parameter to PARTIAL . |
Parameters Only for Result Tables
Parameter | Description | Data type | Required | Default value | Remarks |
---|---|---|---|---|---|
sink.buffer-flush.max-rows | The maximum number of data records that can be written at a time. | INTEGER | No | 1000 | N/A. |
sink.buffer-flush.interval | The interval at which data is flushed. | DURATION | No | 1s | N/A. |
sink.delivery-guarantee | The semantics used when data is written. | STRING | No | at-least-once | Valid values: none , at-least-once Note: The exactly-once semantics is not supported. |
sink.max-retries | The maximum number of retries when data fails to be written to the database. | INTEGER | No | 3 | N/A. |
sink.retry.interval | The interval between retries when data fails to be written to the database. | DURATION | No | 1s | N/A. |
sink.parallelism | The degree of parallelism of the sink. | INTEGER | No | No default value | N/A. |
Data Type Mappings
CDC Source Table
BSON data type | Flink SQL data type |
---|---|
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
Date Timestamp | DATE |
Date Timestamp | TIME |
DateTime | TIMESTAMP(3), TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0), TIMESTAMP_LTZ(0) |
Stringm ObjectId, UUID, Symbol, MD5, JavaScript, Regex | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point: ROW<type STRING, coordinates ARRAY<DOUBLE>> Line: ROW<type STRING, coordinates ARRAY<ARRAY<DOUBLE>>> |
Dimension Table and Result Table
BSON data type | Flink SQL data type |
---|---|
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Boolean | BOOLEAN |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
String | STRING |
ObjectId | |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
Sample Code
Sample Code for a CDC Source Table
CREATE TEMPORARY TABLE mongo_source (
`_id` STRING, --must be declared
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection'
);
CREATE TEMPORARY TABLE productssink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO productssink
SELECT
name,
weight,
tags,
price.amount,
suppliers[1].name
FROM
mongo_source;
Sample Code for a Dimension Table
CREATE TEMPORARY TABLE datagen_source (
id STRING,
a int,
b BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.expire-after-access' = '10min',
'lookup.partial-cache.expire-after-write' = '10min',
'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE blackhole_sink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO productssink
SELECT
T.id,
T.a,
T.b,
H.name
FROM
datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;
Sample Code for a Result Table
CREATE TEMPORARY TABLE datagen_source (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;
Mongo CDC DataStream API
If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to access fully managed Flink. For more information about how to configure a DataStream connector, see Usage of DataStream connectors.
Create a DataStream API program and use MongoDBSource. Sample code:
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
To enable the incremental snapshot feature when you use the DataStream API, use the MongoDBSource#builder() method in the com.ververica.cdc.connectors.mongodb.source package during the construction of the MongoDBSource data source. If you do not need to enable the incremental snapshot feature, use the MongoDBSource#builder() method in the com.ververica.cdc.connectors.mongodb package.
The following table describes the parameters that you must configure during the construction of the MongoDBSource data source.
Parameter | Description |
---|---|
hosts | The hostname of the MongoDB database that you want to access. |
username | The username of the MongoDB database service. Note: If authentication is not enabled on the MongoDB server, you do not need to configure this parameter. |
password | The password of the MongoDB database service. Note: If authentication is not enabled on the MongoDB server, you do not need to configure this parameter. |
databaseList | The name of the MongoDB database that you want to monitor. Note: If you want to read data from multiple databases, you can set this parameter to a regular expression. You can use .* to match all databases. |
collectionList | The name of the MongoDB collection that you want to monitor. Note: If you want to read data from multiple collections, you can set this parameter to a regular expression. You can use .* to match all collections. |
startupOptions | The startup mode of the MongoDB CDC connector. Valid values: StartupOptions.initial() : Pulls all data from the initial offset. StartupOptions.latest-offset() : Pulls change data from the current offset. StartupOptions.timestamp() : Pulls change data from a specific timestamp. |
deserializer | A deserializer, which deserializes SourceRecords into a specific type. Valid values: MongoDBConnectorDeserializationSchema : deserializes SourceRecords that are generated in Upsert mode into the internal data structure RowData of the Flink Table API or Flink SQL API. MongoDBConnectorFullChangelogDeserializationSchema : deserializes SourceRecords that are generated in full changelog mode into the internal data structure RowData of the Flink Table API or Flink SQL API. JsonDebeziumDeserializationSchema : deserializes SourceRecords into JSON strings. |