Milvus Connector
Connect Flink SQL jobs running on Ververica to Milvus to persist vector and scalar data with upsert semantics based on a primary key.
Prerequisites
- A running Milvus 2.4+ deployment with a reachable network address (host and port).
- A database and collection must already exist in Milvus.
- The collection schema must have:
- One primary key field of type
StringorInteger, withAUTO_IDdisabled. - One or more vector fields (e.g.,
FloatVector) with a known dimension.
- One primary key field of type
- Credentials (
userName,password) if authentication is enabled on your Milvus instance.
Overview
The Milvus connector for Flink SQL provides a sink to write streaming or batch data into a Milvus collection. It is ideal for real-time AI and vector-search use cases.
- Upsert Semantics: The sink processes changelog data, performing an
upsertforINSERTandUPDATE_AFTERrecords and adeleteforDELETErecords. - Buffered Writes: Data is buffered and written in batches to optimize for throughput.
- Streaming or Batch: The connector supports both bounded (batch) and unbounded (streaming) write modes.
Syntax and Example
The following example shows how to define a Flink table that sinks to an existing Milvus collection and then insert data into it.
-- Define a sink into an existing Milvus collection
CREATE TABLE milvus_sink (
id BIGINT NOT NULL,
name STRING,
age INT,
vec ARRAY<FLOAT>, -- maps to a FloatVector field in Milvus
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'milvus',
'endpoint' = 'milvus-dev.my.company', -- or IP
'port' = '19530',
'userName' = 'root',
'password' = 'Milvus',
'databaseName' = 'default',
'collectionName' = 'people',
-- tune buffering for throughput vs. latency
'sink.buffer-flush.max-rows' = '200',
'sink.buffer-flush.interval' = '3s'
);
-- Example bounded source (datagen) and insert
CREATE TEMPORARY TABLE gen (
id BIGINT NOT NULL,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'number-of-rows' = '3',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '3'
);
INSERT INTO milvus_sink
SELECT id, name, age,
ARRAY[CAST(0.1 AS FLOAT), CAST(0.2 AS FLOAT), CAST(0.3 AS FLOAT)] AS vec
FROM gen;
The vector dimension in your ARRAY<FLOAT> data must exactly match the dimension of the corresponding vector field in your Milvus collection's schema (3 in this example).
Feeding Deletes
To delete a record in Milvus, your source must be a changelog source (like upsert-kafka with a changelog-json format) that can emit DELETE records.
The sink will then publish a DELETE event for the given primary key.
CREATE TEMPORARY TABLE tombstones_kafka (
id BIGINT NOT NULL,
name STRING,
age INT,
vec ARRAY<FLOAT>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'people-changelog',
'properties.bootstrap.servers' = 'kafka-broker:9092',
'key.format' = 'json',
'value.format' = 'changelog-json',
'value.fields-include' = 'ALL'
);
INSERT INTO MilvusPeople
SELECT * FROM tombstones_kafka;
Partitioning
The connector supports writing to a specific Milvus partition.
Static Partitioning
To write all data from the sink into a single, fixed partition, specify its name using the partitionName option.
CREATE TABLE MilvusSink (...) WITH (
'connector' = 'milvus',
'partitionName' = 'my_partition',
...
);
Partition Key Routing
If your Milvus collection is configured with a partition key, you can enable automatic routing. The connector will use the value from the corresponding field in your Flink data to route each record to the correct partition.
CREATE TABLE MilvusSink (...) WITH (
'connector' = 'milvus',
'partitionKey.enabled' = 'true',
...
);
WITH options
| Option | Required | Default | Description |
|---|---|---|---|
connector | Yes | – | Must be 'milvus'. |
endpoint | Yes | – | The hostname or IP address of the Milvus instance. |
port | No | 19530 | The gRPC port for the Milvus instance. |
userName | Yes | – | The username for Milvus authentication. |
password | Yes | – | The password for Milvus authentication. |
databaseName | Yes | – | The target database name in Milvus. |
collectionName | Yes | – | The target collection name in Milvus. |
partitionName | No | – | The name of a fixed partition to write to. |
partitionKey.enabled | No | false | Set to true to use the collection’s defined partition key for routing. |
sink.buffer-flush.max-rows | No | 200 | The maximum number of records to buffer before flushing. 0 disables this. |
sink.buffer-flush.interval | No | 3s | The maximum time between flushes. 0s disables this. |
sink.max-retries | No | 3 | The number of times to retry on write errors. |
sink.parallelism | No | planner | Manually overrides the parallelism of the sink operator. |
sink.ignoreDelete | No | false | If true, the sink will ignore DELETE records from the changelog source. |
All WITH option names use camelCase (e.g., databaseName, collectionName). Ensure you use these names exactly.
Operational Tips & Limits
- Sink-only: The Milvus connector currently only supports writing data (sink). It cannot be used as a source or for lookup joins.
- Deletes Require Changelogs: To delete records in Milvus, you must use a source that produces changelog data with
DELETEoperations. - Indexing for Queries: To query data immediately after writing, you must first create a vector index on your collection and then load the collection into memory. This is a Milvus requirement.
- Network Accessibility: Ensure that your Ververica deployment's network configuration (e.g., VPC, Security Groups, Firewalls) allows outbound traffic to your Milvus instance's host and port. Additionally, configure your Milvus instance's firewall or security group to allow inbound traffic from your Ververica deployment.
Milvus Setup for Testing
This section shows how to get a Milvus instance running with Docker and how to perform basic operations using the Python client. This is useful for local development and for following the examples in a test environment.
Step 1: Set Up Milvus with Docker
This example uses an Ubuntu 22.04 environment.
1. Install Docker and Docker Compose
Connect to your host machine with SSH and run the following commands to install the necessary packages.
sudo apt-get update -y
sudo apt-get install -y ca-certificates curl gnupg lsb-release
sudo install -m 0755 -d /etc/apt/keyrings
curl -fsSL [https://download.docker.com/linux/ubuntu/gpg](https://download.docker.com/linux/ubuntu/gpg) \
| sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] \
[https://download.docker.com/linux/ubuntu](https://download.docker.com/linux/ubuntu) \
$(. /etc/os-release; echo "$VERSION_CODENAME") stable" \
| sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get update -y
sudo apt-get install -y docker-ce docker-ce-cli containerd.io \
docker-buildx-plugin docker-compose-plugin
sudo systemctl enable --now docker
sudo usermod -aG docker $USER
You may need to log out and log back in for the user group changes to take effect.
**2. Create a docker-compose.yaml file
This file defines the Milvus service and its dependencies (etcd and MinIO).
version: '3.8'
services:
etcd:
image: quay.io/coreos/etcd:v3.5.11
container_name: etcd
restart: unless-stopped
command: >
/usr/local/bin/etcd
-advertise-client-urls http://etcd:2379
-listen-client-urls [http://0.0.0.0:2379](http://0.0.0.0:2379)
-data-dir /etcd-data
volumes:
- etcd_data:/etcd-data
minio:
image: minio/minio:latest
container_name: minio
restart: unless-stopped
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
volumes:
- minio_data:/data
milvus:
image: milvusdb/milvus:v2.4.10
container_name: milvus
restart: unless-stopped
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
command: ["milvus", "run", "standalone"]
depends_on:
- etcd
- minio
ports:
- "19530:19530"
- "9091:9091"
volumes:
- milvus_data:/var/lib/milvus
volumes:
etcd_data:
minio_data:
milvus_data:
*3. Start the Services
From the same directory, run the following command.
pip install pymilvus
You can verify that the services are running with docker ps.
Step 3: Prepare Collection and Query with Python
You can interact with your Milvus instance using the pymilvus client.
##1. Install the client:
pip install pymilvus
##2. Run the Python Script:
The following script connects to Milvus, creates a collection (if it doesn't exist), creates an index, and loads the collection into memory so it's ready for queries.
from pymilvus import (
connections, utility, Collection, FieldSchema, CollectionSchema, DataType
)
# --- Connection Details ---
HOST = "localhost" # Use your EC2 host IP if running remotely
PORT = "19530"
COLLECTION_NAME = "people_demo"
VECTOR_FIELD = "vec"
DIMENSION = 3
# --- Connect to Milvus ---
connections.connect("default", host=HOST, port=PORT)
print("Connected to Milvus.")
# --- Create collection if it's missing ---
if COLLECTION_NAME not in utility.list_collections():
schema = CollectionSchema(
fields=[
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
FieldSchema(name="name", dtype=DataType.VARCHAR, max_length=255),
FieldSchema(name="age", dtype=DataType.INT32),
FieldSchema(name=VECTOR_FIELD, dtype=DataType.FLOAT_VECTOR, dim=DIMENSION),
],
description="People demo collection"
)
Collection(name=COLLECTION_NAME, schema=schema)
print(f"Created collection: {COLLECTION_NAME}")
else:
print(f"Collection '{COLLECTION_NAME}' already exists.")
col = Collection(COLLECTION_NAME)
# --- Create a vector index if one doesn't exist ---
if not col.has_index():
print("Creating index on vector field...")
col.create_index(
field_name=VECTOR_FIELD,
index_params={"index_type": "AUTOINDEX", "metric_type": "L2"}
)
print("Waiting for index to build...")
utility.wait_for_index_building_complete(COLLECTION_NAME)
print("Index is ready.")
# --- Load collection and query ---
print("Loading collection into memory...")
col.load()
print(f"Collection loaded. Number of entities: {col.num_entities}")
# Example: Query for records after inserting from Flink
rows = col.query(expr="id >= 1", output_fields=["id","name","age"])
print("Query results:")
print(rows)
Troubleshooting
-
"Missing required options...": Verify that all
WITHoption names use the correct camelCase format (e.g.,databaseName). -
"Index not found" or "Collection not loaded": Ensure you have created a vector index on your collection and have called the
.load()command before querying. -
"Dimension mismatch": The length of the vector in your Flink
ARRAY<FLOAT>data must exactly match the dim specified for the vector field in your Milvus collection schema.