Skip to main content

MongoDB Catalog

The MongoDB catalog enables browsing MongoDB databases and collections directly from Flink SQL without writing per-table DDL statements. The catalog maps MongoDB databases to Flink databases and collections to Flink tables, and automatically infers collection schemas.

Background Information

The MongoDB catalog maps MongoDB's native data hierarchy to Flink's catalog model:

MongoDB ConceptFlink Catalog Concept
MongoDB instance / replica setCatalog
DatabaseDatabase
CollectionTable

Tables discovered through the catalog can serve as source tables, dimension tables, and result tables in Flink SQL deployments.

The catalog is read-only with respect to metadata: it discovers existing databases and collections. You cannot create, modify, or delete databases or collections through a MongoDB catalog.

Prerequisites

  • A running MongoDB instance accessible from Ververica Platform.
  • If MongoDB authentication is enabled: credentials with at least listDatabases, listCollections, and find permissions.
  • Network access configured (IP allowlist or private connection).

Limitations

  • You cannot modify an existing MongoDB catalog using DDL statements. To update connection settings, drop the catalog and recreate it.
  • Metadata discovery only. You cannot create, modify, or delete databases or collections through a MongoDB catalog.

Create a Catalog

CREATE CATALOG my_mongo_catalog WITH (
'type' = 'mongodb',
'hosts' = '<host>:27017',
'username' = '<username>',
'password' = '${secret_values.password}',
'default-database' = 'myDatabase'
);

Catalog Options

OptionRequiredDefaultTypeDescription
typeYes(none)StringMust be mongodb.
hostsNo(none)StringHostname of the MongoDB instance. Separate multiple hosts with commas. Either uri or hosts must be set.
uriNo(none)StringFull MongoDB connection URI. If set, overrides hosts, scheme, username, password, and connection.options.
schemeNomongodbStringConnection protocol. Valid values: mongodb, mongodb+srv.
usernameNo(none)StringRequired if MongoDB authentication is enabled.
passwordNo(none)StringRequired if MongoDB authentication is enabled.
default-databaseYes(none)StringDefault database selected when USE CATALOG is called without a subsequent USE <database>.
connection.optionsNo(none)StringAdditional MongoDB connection options as key=value pairs separated by &.
max.fetch.recordsNo100IntegerMaximum number of documents read when inferring a collection schema.
scan.flatten-nested-columns.enabledNofalseBooleanIf true, nested BSON fields are expanded and named using dot notation, for example nested.col. Applies only when catalog tables are used as source tables.
scan.primitive-as-stringNofalseBooleanIf true, all primitive BSON types are inferred as STRING.
note

To keep credentials out of SQL statements, use secret references: 'password' = '${secret_values.password}'. For more information, see Secret Values.

Schema Inference

When the MongoDB catalog infers the schema of a table, it reads up to max.fetch.records documents from the collection, parses each document's schema, and merges all parsed schemas into a single collection schema.

The inferred schema includes physical columns inferred from BSON documents. The _id column always serves as the default primary key to prevent duplicate data.

Schema merge rules:

  • When a field in the parsed document does not exist in the current schema, the catalog adds it.
  • When two columns share the same name but differ in precision, the catalog retains the larger precision.
  • When two columns share the same name but have different data types, the catalog uses the smallest common parent type in the type hierarchy. For example, DECIMAL and FLOAT resolve to DOUBLE.