协程操作 Milvus 向量数据库

依赖:pip install pymilvus

本示例演示如何使用 Python 协程并发查询 Milvus 向量数据库,提升 I/O 密集型任务的效率。


1. 初始化连接

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
 
def init_milvus():
    connections.connect(alias="default", host="localhost", port="19530")
    
    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
        FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=2)
    ]
    
    schema = CollectionSchema(fields=fields, description="测试集合")
    collection = Collection(name="test_coroutine", schema=schema)
    collection.create_index(field_name="vector", index_params={"index_type": "FLAT"})
    
    return collection

2. 协程查询

pymilvus 客户端以同步 API 为主,通过 run_in_executor 包装可支持协程并发查询:

import asyncio
 
async def search_milvus(collection, query_vector, top_k=2):
    loop = asyncio.get_running_loop()
    
    result = await loop.run_in_executor(
        None,  # 使用默认线程池
        lambda: collection.search(
            data=[query_vector],
            anns_field="vector",
            param={"metric_type": "L2"},
            limit=top_k,
            output_fields=["id"]
        )
    )
    return result[0]

3. 并发查询主函数

async def main():
    collection = init_milvus()
    
    query_vectors = [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]
    
    tasks = [search_milvus(collection, vec) for vec in query_vectors]
    results = await asyncio.gather(*tasks)
    
    for i, res in enumerate(results):
        print(f"\n查询向量 {query_vectors[i]} 的结果:")
        for hit in res:
            print(f"  id: {hit.id}, 距离: {hit.distance:.4f}")
 
if __name__ == "__main__":
    asyncio.run(main())

4. 批量插入示例

async def insert_vectors(collection, vectors):
    loop = asyncio.get_running_loop()
    
    await loop.run_in_executor(
        None,
        lambda: collection.insert([list(range(len(vectors))), vectors])
    )

5. 完整示例

import asyncio
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
 
def init_milvus():
    connections.connect(alias="default", host="localhost", port="19530")
    
    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
        FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128)
    ]
    
    schema = CollectionSchema(fields=fields, description="向量检索测试")
    collection = Collection(name="vectors", schema=schema)
    collection.create_index(
        field_name="vector",
        index_params={"index_type": "IVF_FLAT", "params": {"nlist": 128}}
    )
    collection.load()
    
    return collection
 
async def search(collection, query_vectors, top_k=10):
    loop = asyncio.get_running_loop()
    
    results = await loop.run_in_executor(
        None,
        lambda: collection.search(
            data=query_vectors,
            anns_field="vector",
            param={"metric_type": "L2", "params": {"nprobe": 16}},
            limit=top_k
        )
    )
    return results
 
async def main():
    collection = init_milvus()
    
    import numpy as np
    query_vectors = [np.random.rand(128).tolist() for _ in range(100)]
    
    results = await search(collection, query_vectors)
    
    for i, hits in enumerate(results[:3]):
        print(f"Query {i}: {len(hits)} results")
 
if __name__ == "__main__":
    asyncio.run(main())

6. 性能对比

方式100 次查询耗时
同步串行~10s
协程并发~2s

关键点: 协程通过并发减少 I/O 等待时间,但单个查询的延迟不变。适合批量查询场景。