MongoDB Catalog
The MongoDB catalog enables browsing MongoDB databases and collections directly from Flink SQL without writing per-table DDL statements. The catalog maps MongoDB databases to Flink databases and collections to Flink tables, and automatically infers collection schemas.
Background Information
The MongoDB catalog maps MongoDB's native data hierarchy to Flink's catalog model:
| MongoDB Concept | Flink Catalog Concept |
|---|---|
| MongoDB instance / replica set | Catalog |
| Database | Database |
| Collection | Table |
Tables discovered through the catalog can serve as source tables, dimension tables, and result tables in Flink SQL deployments.
The catalog is read-only with respect to metadata: it discovers existing databases and collections. You cannot create, modify, or delete databases or collections through a MongoDB catalog.
Prerequisites
- A running MongoDB instance accessible from Ververica Platform.
- If MongoDB authentication is enabled: credentials with at least
listDatabases,listCollections, andfindpermissions. - Network access configured (IP allowlist or private connection).
Limitations
- You cannot modify an existing MongoDB catalog using DDL statements. To update connection settings, drop the catalog and recreate it.
- Metadata discovery only. You cannot create, modify, or delete databases or collections through a MongoDB catalog.
Create a Catalog
CREATE CATALOG my_mongo_catalog WITH (
'type' = 'mongodb',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'default-database' = 'myDatabase'
);
Catalog Options
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
type | Yes | (none) | String | Must be mongodb. |
hosts | No | (none) | String | Hostname of the MongoDB instance. Separate multiple hosts with commas. Either uri or hosts must be set. |
uri | No | (none) | String | Full MongoDB connection URI. If set, overrides hosts, scheme, username, password, and connection.options. |
scheme | No | mongodb | String | Connection protocol. Valid values: mongodb, mongodb+srv. |
username | No | (none) | String | Required if MongoDB authentication is enabled. |
password | No | (none) | String | Required if MongoDB authentication is enabled. |
default-database | Yes | (none) | String | Default database selected when USE CATALOG is called without a subsequent USE <database>. |
connection.options | No | (none) | String | Additional MongoDB connection options as key=value pairs separated by &. |
max.fetch.records | No | 100 | Integer | Maximum number of documents read when inferring a collection schema. |
scan.flatten-nested-columns.enabled | No | false | Boolean | If true, nested BSON fields are expanded and named using dot notation, for example nested.col. Applies only when catalog tables are used as source tables. |
scan.primitive-as-string | No | false | Boolean | If true, all primitive BSON types are inferred as STRING. |
To keep credentials out of SQL statements, use secret references: 'password' = '${secret_values.password}'. For more information, see Secret Values.
Schema Inference
When the MongoDB catalog infers the schema of a table, it reads up to max.fetch.records documents from the collection, parses each document's schema, and merges all parsed schemas into a single collection schema.
The inferred schema includes physical columns inferred from BSON documents. The _id column always serves as the default primary key to prevent duplicate data.
Schema merge rules:
- When a field in the parsed document does not exist in the current schema, the catalog adds it.
- When two columns share the same name but differ in precision, the catalog retains the larger precision.
- When two columns share the same name but have different data types, the catalog uses the smallest common parent type in the type hierarchy. For example,
DECIMALandFLOATresolve toDOUBLE.