Milvus Connector
Connect Flink SQL jobs running on Ververica to Milvus to persist vector and scalar data with upsert semantics based on a primary key.
Prerequisites
- A running Milvus 2.4+ deployment with a reachable network address (host and port).
- A database and collection must already exist in Milvus.
- The collection schema must have:
- One primary key field of type
StringorInteger, withAUTO_IDdisabled. - One or more vector fields (e.g.,
FloatVector) with a known dimension.
- One primary key field of type
- Credentials (
userName,password) if authentication is enabled on your Milvus instance.
Overview
The Milvus connector for Flink SQL provides a sink to write streaming or batch data into a Milvus collection. It is ideal for real-time AI and vector-search use cases.
- Upsert Semantics: The sink processes changelog data, performing an
upsertforINSERTandUPDATE_AFTERrecords and adeleteforDELETErecords. - Buffered Writes: Data is buffered and written in batches to optimize for throughput.
- Streaming or Batch: The connector supports both bounded (batch) and unbounded (streaming) write modes.
Syntax and Example
The following example shows how to define a Flink table that sinks to an existing Milvus collection and then insert data into it.
-- Define a sink into an existing Milvus collection
CREATE TABLE milvus_sink (
id BIGINT NOT NULL,
name STRING,
age INT,
vec ARRAY<FLOAT>, -- maps to a FloatVector field in Milvus
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'milvus',
'endpoint' = 'milvus-dev.my.company', -- or IP
'port' = '19530',
'userName' = 'root',
'password' = 'Milvus',
'databaseName' = 'default',
'collectionName' = 'people',
-- tune buffering for throughput vs. latency
'sink.buffer-flush.max-rows' = '200',
'sink.buffer-flush.interval' = '3s'
);
-- Example bounded source (datagen) and insert
CREATE TEMPORARY TABLE gen (
id BIGINT NOT NULL,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'number-of-rows' = '3',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '3'
);
INSERT INTO milvus_sink
SELECT id, name, age,
ARRAY[CAST(0.1 AS FLOAT), CAST(0.2 AS FLOAT), CAST(0.3 AS FLOAT)] AS vec
FROM gen;
The vector dimension in your ARRAY<FLOAT> data must exactly match the dimension of the corresponding vector field in your Milvus collection's schema (3 in this example).
Feeding Deletes
To delete a record in Milvus, your source must be a changelog source (like upsert-kafka with a changelog-json format) that can emit DELETE records.
The sink will then publish a DELETE event for the given primary key.
CREATE TEMPORARY TABLE tombstones_kafka (
id BIGINT NOT NULL,
name STRING,
age INT,
vec ARRAY<FLOAT>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'people-changelog',
'properties.bootstrap.servers' = 'kafka-broker:9092',
'key.format' = 'json',
'value.format' = 'changelog-json',
'value.fields-include' = 'ALL'
);
INSERT INTO MilvusPeople
SELECT * FROM tombstones_kafka;
Partitioning
The connector supports writing to a specific Milvus partition.
Static Partitioning
To write all data from the sink into a single, fixed partition, specify its name using the partitionName option.
CREATE TABLE MilvusSink (...) WITH (
'connector' = 'milvus',
'partitionName' = 'my_partition',
...
);
Partition Key Routing
If your Milvus collection is configured with a partition key, you can enable automatic routing. The connector will use the value from the corresponding field in your Flink data to route each record to the correct partition.
CREATE TABLE MilvusSink (...) WITH (
'connector' = 'milvus',
'partitionKey.enabled' = 'true',
...
);
WITH Options
| Option | Required | Default | Description |
|---|---|---|---|
connector | Yes | – | Must be 'milvus'. |
endpoint | Yes |