Using Vector Database (Qdrant)
Nekro Agent integrates with the Qdrant vector database and provides plugins with convenient access to its client. This enables plugins to leverage vector similarity search to implement powerful semantic understanding, information retrieval, content recommendation, and other features without the need to build and manage vector database instances themselves.
What is a Vector Database?
A vector database is specialized for storing, managing, and searching high-dimensional vector data. These vectors are typically mathematical representations (called embeddings) of text, images, audio, or other data, generated by deep learning models (such as Transformer models).
By comparing the distance or similarity between vectors (such as cosine similarity, Euclidean distance), vector databases can quickly find entries most similar to a given query vector, thereby enabling:
- Semantic Search: Not just keyword matching, but understanding the semantic meaning of queries for searching.
- Recommendation Systems: Recommending similar content based on vectors of content users have liked.
- Question-Answering Systems: Converting questions and document chunks into vectors, finding the most relevant document chunks as answer sources.
- Anomaly Detection, Clustering, and more.
Qdrant Integration in Nekro Agent
Nekro Agent initializes and manages a Qdrant client instance within its core services. Plugins can access this client through the nekro_agent.api.core module to interact with the Qdrant server.
1. Getting the Qdrant Client
In your plugin code (typically within sandbox methods or initialization methods), you can obtain the Qdrant client like this:
from typing import Optional
from nekro_agent.api import core
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import PointStruct, Distance, VectorParams
async def get_my_qdrant_client() -> QdrantClient:
qdrant_client: Optional[QdrantClient] = await core.get_qdrant_client()
if not qdrant_client:
core.logger.error("Unable to get Qdrant client instance! Please check Agent core configuration.")
# You can decide whether to raise an exception or return None based on plugin logic
raise ConnectionError("Qdrant client is not available.")
return qdrant_clientcore.get_qdrant_client() is an asynchronous function that returns a configured QdrantClient instance, or returns None if Qdrant is not properly configured or unavailable. Therefore, it's important to check before use.
You can also obtain Qdrant connection configuration information (such as host, port, api_key, etc.) through core.get_qdrant_config(), but this is typically managed by the Agent core and plugin developers rarely need to access it directly.
2. Plugin-Specific Collection Names
To avoid conflicts where different plugins use the same collection name in Qdrant, Nekro Agent provides a helper method plugin.get_vector_collection_name() to generate a unique, standardized collection name for your plugin.
# Assuming plugin is your NekroPlugin instance
# from nekro_agent.api.plugin import NekroPlugin
# plugin = NekroPlugin(...)
# Generate a default collection name based on the plugin's author and module name
default_collection_name = plugin.get_vector_collection_name()
# Example output (assuming plugin author "MyAuthor", module name "MyPlugin"):
# "MyAuthor.MyPlugin"
# Generate a collection name with a suffix for specific purposes within the plugin
specific_collection_name = plugin.get_vector_collection_name("user_documents")
# Example output: "MyAuthor.MyPlugin-documents"It is strongly recommended that plugins create and use collection names generated through this method in Qdrant to ensure good isolation and avoid naming conflicts.
3. Checking and Creating Collections During Initialization
During plugin initialization, it's often necessary to check if the required Qdrant collection exists and create it if it doesn't. This ensures that the plugin can correctly access the collection in subsequent operations.
from nekro_agent.api import core
from nekro_agent.api.plugin import NekroPlugin # Assumed plugin instance
from qdrant_client import QdrantClient, models
from typing import Optional # Ensure Optional is imported
# Assuming plugin is your NekroPlugin instance
# plugin = NekroPlugin(name="MySamplePlugin", module_name="my_sample", author="MyAuthor", ...)
# EMBEDDING_DIMENSION = 1024 # Assuming your embedding vector dimension
async def init_vector_db_collection(plugin_instance: NekroPlugin, embedding_dimension: int):
"""Initialize the plugin's vector database collection"""
qdrant_client: Optional[QdrantClient] = await core.get_qdrant_client()
if not qdrant_client:
core.logger.warning(f"[{plugin_instance.name}] Unable to get Qdrant client, skipping vector database collection initialization")
return
collection_name = plugin_instance.get_vector_collection_name()
try:
collections_response = await qdrant_client.get_collections()
existing_collections = [col.name for col in collections_response.collections]
if collection_name not in existing_collections:
core.logger.info(f"[{plugin_instance.name}] Creating vector database collection: {collection_name}")
await qdrant_client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(
size=embedding_dimension, # Vector dimension, must match your embedding model
distance=models.Distance.COSINE, # Distance metric, COSINE is commonly used for text
),
)
core.logger.success(f"[{plugin_instance.name}] Vector database collection {collection_name} created successfully")
else:
core.logger.info(f"[{plugin_instance.name}] Vector database collection {collection_name} already exists")
except Exception as e:
core.logger.error(f"[{plugin_instance.name}] Failed to initialize vector database collection {collection_name}: {e}")
# Call in your plugin initialization logic (e.g., @plugin.mount_init_method())
# async def on_plugin_init():
# # plugin should be your plugin instance
# # MY_PLUGIN_EMBEDDING_DIMENSION is the dimension configured in your plugin
# await init_vector_db_collection(plugin, MY_PLUGIN_EMBEDDING_DIMENSION)Explanation:
plugin_instance.get_vector_collection_name(): Get the plugin-specific collection name.qdrant_client.get_collections(): Get a list of all collections in Qdrant.qdrant_client.create_collection(): Create a new collection.collection_name: The name of the collection.vectors_config: Define vector parameters.size: The dimension of the vector, must match the vector dimension generated by your embedding model.distance: The distance function for calculating vector similarity, commonly used options includeDistance.COSINE(cosine similarity),Distance.EUCLID(Euclidean distance),Distance.DOT(dot product). For text embeddings, cosine similarity is usually the best choice.
4. Configuring Embedding Model and Dimensions
To convert text or other data into vectors, you need to use an embedding model. Nekro Agent allows you to specify the embedding model group and its dimensions in your plugin configuration.
Add the following fields to your plugin configuration class (inheriting from nekro_agent.api.plugin.ConfigBase):
from pydantic import Field
from nekro_agent.api.plugin import ConfigBase
class MyPluginConfig(ConfigBase):
EMBEDDING_MODEL_GROUP: str = Field(
default="text-embedding", # Default model group name, modify according to actual situation
title="Embedding Model Group",
description="Name of the model group used to generate text embedding vectors.",
json_schema_extra={"ref_model_groups": True, "required": True, "model_type": "embedding"},
)
EMBEDDING_DIMENSION: int = Field(
default=1024, # Default dimension, modify according to the actual output dimension of your selected model group
title="Embedding Vector Dimension",
description="Vector dimension output by the embedding model.",
)
# ... other configuration itemsExplanation:
EMBEDDING_MODEL_GROUP: String field used to specify the model group name defined in the Nekro Agent core configuration.json_schema_extra={"ref_model_groups": True, "required": True, "model_type": "embedding"}: This metadata is used to provide a model group selection dropdown in the Nekro Agent management interface and indicate that this is an embedding type model.
EMBEDDING_DIMENSION: Integer field representing the output vector dimension of your embedding model. This value must strictly match thesizeparameter inVectorParamsininit_vector_db_collectionand the actual model output dimension, otherwise it will cause errors.
Getting configuration:
# Assuming plugin is your NekroPlugin instance
# my_config: MyPluginConfig = plugin.get_config(MyPluginConfig)
# embedding_model_group_name = my_config.EMBEDDING_MODEL_GROUP
# embedding_dimension = my_config.EMBEDDING_DIMENSION5. Generating Text Embeddings
After obtaining text, you need to call the corresponding embedding model service to convert it into vectors. Nekro Agent provides tools to call OpenAI-compatible embedding interfaces configured through model groups.
from typing import List, Optional # Ensure import
from nekro_agent.api import core
from nekro_agent.api.core import config as core_config, ModelConfigGroup
from nekro_agent.services.agent.openai import gen_openai_embeddings # Import helper function
# Assuming MyPluginConfig is your plugin configuration class
# plugin_config: MyPluginConfig = plugin.get_config(MyPluginConfig)
async def generate_text_embedding(text_to_embed: str, plugin_config: MyPluginConfig) -> List[float]:
"""
Generate embedding vectors for text using the configured embedding model.
"""
try:
# 1. Get model group configuration information
model_group_info: Optional[ModelConfigGroup] = core_config.get_model_group_info(plugin_config.EMBEDDING_MODEL_GROUP)
if not model_group_info:
core.logger.error(f"Unable to find model group configuration named '{plugin_config.EMBEDDING_MODEL_GROUP}'.")
raise ValueError(f"Embedding model group '{plugin_config.EMBEDDING_MODEL_GROUP}' not found.")
# 2. Call the embedding model to generate vectors
# Note: The `model` parameter of gen_openai_embeddings is typically the specific chat/completion model name in the model group,
# but the API call will route to the embedding endpoint based on the base_url.
# The `dimensions` parameter is optional, used if the model supports and you want to specify the output dimension (some models like text-embedding-3-small support it).
# If unsure, you can omit dimensions first, or ensure it matches EMBEDDING_DIMENSION.
embedding_vector: List[float] = await gen_openai_embeddings(
model=model_group_info.CHAT_MODEL, # Or a model name in the model group suitable for getting API information
input=text_to_embed,
api_key=model_group_info.API_KEY,
base_url=model_group_info.BASE_URL,
dimensions=plugin_config.EMBEDDING_DIMENSION # Explicitly specify dimension
)
vector_dimension = len(embedding_vector)
core.logger.debug(f"Generated embedding vector: '{text_to_embed[:20]}...', dimension: {vector_dimension}")
# 3. Verify dimension matches configuration
if vector_dimension != plugin_config.EMBEDDING_DIMENSION:
core.logger.error(
f"Embedding vector dimension mismatch! Configured dimension: {plugin_config.EMBEDDING_DIMENSION}, "
f"actual dimension: {vector_dimension}. "
f"Please check plugin configuration EMBEDDING_DIMENSION or model settings."
)
# Depending on the situation, you may need to raise an exception here or perform other error handling
raise ValueError(f"Embedding dimension mismatch.")
return embedding_vector
except Exception as e:
core.logger.error(f"Failed to generate text embedding: {e}")
raise # Or handle and re-raise a specific type of exception
# Usage example:
# async def some_function():
# # plugin_config should be your loaded plugin configuration instance
# my_text = "This is a text that needs to be vectorized"
# try:
# vector = await generate_text_embedding(my_text, plugin_config)
# # Next, you can store vector in Qdrant
# except Exception as e:
# core.logger.error(f"Failed to process text '{my_text}': {e}")Explanation:
core_config.get_model_group_info(): Get detailed configuration including API Key and Base URL based on the model group name.gen_openai_embeddings(): This is a helper function provided by Nekro Agent for calling OpenAI-format embedding APIs.model: Typically a model name in the model group; the API service will route to the correct endpoint based on the request type (embedding here).input: The text to be embedded.dimensions: (Optional) Some models support specifying the output dimension. Ensure this value matches yourEMBEDDING_DIMENSIONconfiguration and the dimension of the Qdrant collection.
- Dimension Validation: After generating embeddings, be sure to check if the dimension of the returned vector matches the
EMBEDDING_DIMENSIONin your plugin configuration, which is also the dimension expected by the Qdrant collection. Mismatches will cause storage failures or inaccurate search results.
6. Storing Data to Qdrant
Once you have the vector representation of text, you can store it in Qdrant along with other metadata. Each storage unit in Qdrant is called a "Point".
from typing import List, Dict, Any, Optional # Ensure import
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import PointStruct # Ensure import
import hashlib # Used for example ID conversion
# Assuming qdrant_client is the obtained client instance
# collection_name is your plugin collection name
# embedding_vector is the vector generated in the previous step
# original_data_id is the unique identifier of your data within the plugin, such as a UUID or database primary key
async def store_vector_data(
qdrant_client: QdrantClient,
collection_name: str,
original_data_id: str, # Unique ID of data within the plugin
embedding_vector: List[float],
payload_data: Dict[str, Any] # Other metadata to store with the vector
):
"""Store vectors and metadata to Qdrant"""
try:
# Qdrant's Point ID must be a number or UUID.
# If your original_data_id is a string (non-UUID format), it needs to be converted to a number.
# Example: Use the hash value of the original ID (truncated and converted to integer) as Qdrant Point ID
# Note: Hash conflicts are possible, although probability is low for truncated MD5.
# A more robust approach is to use UUID, or maintain a mapping of original_data_id to numeric ID within the plugin.
qdrant_point_id_hex = hashlib.md5(original_data_id.encode()).hexdigest()[:16] # Take first 16 hex characters
qdrant_point_id = int(qdrant_point_id_hex, 16) # Convert to integer
points_to_upsert = [
models.PointStruct(
id=qdrant_point_id, # Unique ID of the Point, must be integer or UUID
vector=embedding_vector,
payload={
"original_id": original_data_id, # Store original ID in payload for association
**payload_data # Expand your other metadata into the payload
# For example: "text_content": "This is original content...", "source": "doc1.txt"
}
)
]
response = await qdrant_client.upsert(
collection_name=collection_name,
points=points_to_upsert,
wait=True # Can be set to True to wait for operation completion and confirmation
)
core.logger.info(f"Successfully stored data to Qdrant, Original ID: {original_data_id}, Qdrant Point ID: {qdrant_point_id}. Status: {response.status}")
except Exception as e:
core.logger.error(f"Failed to store data to Qdrant (Original ID: {original_data_id}): {e}")
raise
# Usage example:
# async def example_usage_store():
# # qdrant_client, collection_name, vector, plugin_config need to be defined
# my_payload = {"description": "This is a sample document", "category": "test"}
# original_id = "doc_abc_123"
# # vector = await generate_text_embedding("Sample document content", plugin_config) # Generate vector
# # await store_vector_data(qdrant_client, collection_name, original_id, vector, my_payload)
# passExplanation:
models.PointStruct: Used to define a data point.id: Unique identifier of the point. Qdrant requires ID to be an integer (uint64) or a UUID string. If your internal ID is another type of string, you need to convert it to a Qdrant-compatible ID (for example, by hashing and taking part of it as an integer, or using UUID). Inemotion.py, a hexadecimal string ID is converted to an integer:int(emotion_id, 16). The example above useshashlibto generate an integer ID.vector: The embedding vector of the data.payload: A dictionary used to store arbitrary metadata associated with the vector. This metadata can be filtered or returned during searches. It's recommended to also store your original data ID in the payload so that even if the Qdrant Point ID differs from your internal ID, you can retrieve the original association through the payload.
qdrant_client.upsert(): Used to insert or update data points. If a point with the sameidalready exists, it will be updated; otherwise, a new point will be created.wait=True: Optional parameter, if set toTrue, the operation will wait for Qdrant to confirm the write is complete.
7. Searching Data from Qdrant
The core functionality of a vector database is to search for similar entries based on a query vector.
from typing import List, Optional # Ensure import
from qdrant_client import QdrantClient, models
async def search_similar_vectors(
qdrant_client: QdrantClient,
collection_name: str,
query_vector: List[float],
top_k: int = 5, # Return the top_k most similar results
score_threshold: Optional[float] = None # Optional similarity score threshold
) -> List[models.ScoredPoint]:
"""Search for entries similar to the query vector in Qdrant"""
try:
search_results = await qdrant_client.search(
collection_name=collection_name,
query_vector=query_vector,
limit=top_k,
score_threshold=score_threshold, # For example, only return results with similarity higher than 0.75
with_payload=True # Set to True to include payload data in results
)
core.logger.info(f"Qdrant found {len(search_results)} results.")
# Each search_result is a ScoredPoint object, containing id, score, payload, vector (if requested)
# for hit in search_results:
# core.logger.debug(f"Found point: {hit.id}, score: {hit.score}, payload: {hit.payload}")
return search_results
except Exception as e:
core.logger.error(f"Qdrant search failed: {e}")
raise
# Usage example:
# async def example_usage_search():
# # qdrant_client, collection_name, plugin_config need to be defined
# query_text = "How should I configure the plugin?"
# # query_embedding_vector = await generate_text_embedding(query_text, plugin_config)
# # results = await search_similar_vectors(qdrant_client, collection_name, query_embedding_vector, top_k=3)
# # for res in results:
# # original_id = res.payload.get("original_id") if res.payload else None
# # print(f"Similar entry original ID: {original_id}, description: {res.payload.get('description'), similarity: {res.score}")
# passExplanation:
qdrant_client.search(): Perform vector search.query_vector: The embedding vector for querying.limit: The number of most similar results to return.score_threshold: (Optional) A float value; only results with similarity scores above this threshold will be returned. The range and meaning of scores depend on the distance metric (Distance) selected when creating the collection. ForCOSINE, the score range is typically 0 to 1 (or -1 to 1, depending on implementation; Qdrant's cosine similarity is higher is better, with values close to 1 indicating very similar).with_payload=True: IfTrue, search results will include thepayloaddata of each point.with_vector=True: (Optional) IfTrue, search results will include the vector of each point itself.
- The return result is a list of
models.ScoredPointobjects, each containingid,score,payload, andvector(if requested).
8. Deleting Data from Qdrant
If you need to remove certain data points from the collection, you can use the delete method.
from typing import List # Ensure import
from qdrant_client import QdrantClient, models
import hashlib # Used for example ID conversion
async def delete_vector_data_by_original_ids(
qdrant_client: QdrantClient,
collection_name: str,
original_data_ids: List[str] # List of original data IDs to delete
):
"""Delete data points from Qdrant based on original data IDs"""
try:
# Convert original IDs to Qdrant point IDs (same conversion method as when storing)
qdrant_point_ids = []
for original_id in original_data_ids:
qdrant_point_id_hex = hashlib.md5(original_id.encode()).hexdigest()[:16]
qdrant_point_id = int(qdrant_point_id_hex, 16)
qdrant_point_ids.append(qdrant_point_id)
# Delete points
await qdrant_client.delete(
collection_name=collection_name,
points_selector=models.PointIdsList(
points=qdrant_point_ids
),
wait=True # Wait for operation completion
)
core.logger.info(f"Successfully deleted {len(qdrant_point_ids)} data points from Qdrant")
except Exception as e:
core.logger.error(f"Failed to delete data from Qdrant: {e}")
raise
# Usage example:
# async def example_usage_delete():
# # qdrant_client, collection_name need to be defined
# ids_to_delete = ["doc_abc_123", "doc_def_456"]
# # await delete_vector_data_by_original_ids(qdrant_client, collection_name, ids_to_delete)
# passExplanation:
qdrant_client.delete(): Delete data points from the collection.points_selector: Specifies which points to delete. Here we usemodels.PointIdsListto delete points with specific IDs.wait=True: Wait for the operation to complete.
- Important: When deleting, you need to use the same ID conversion method as when storing to ensure you're deleting the correct points.
9. Updating Data in Qdrant
To update existing data in Qdrant, you can use the same upsert method as when storing. If a point with the same ID already exists, it will be updated.
async def update_vector_data(
qdrant_client: QdrantClient,
collection_name: str,
original_data_id: str,
new_embedding_vector: List[float],
new_payload_data: Dict[str, Any]
):
"""Update existing vector data in Qdrant"""
try:
# Convert original ID to Qdrant point ID (same conversion method as when storing)
qdrant_point_id_hex = hashlib.md5(original_data_id.encode()).hexdigest()[:16]
qdrant_point_id = int(qdrant_point_id_hex, 16)
# Update using upsert
await qdrant_client.upsert(
collection_name=collection_name,
points=[
models.PointStruct(
id=qdrant_point_id,
vector=new_embedding_vector,
payload={
"original_id": original_data_id,
**new_payload_data
}
)
],
wait=True
)
core.logger.info(f"Successfully updated data point in Qdrant, Original ID: {original_data_id}")
except Exception as e:
core.logger.error(f"Failed to update data in Qdrant (Original ID: {original_data_id}): {e}")
raise10. Best Practices and Considerations
Dimension Consistency: Always ensure that the vector dimensions used in your configuration, embedding model, and Qdrant collection are consistent. Mismatches will cause errors.
ID Management: Consider using a robust method to map between your internal IDs and Qdrant point IDs. Using UUIDs or maintaining a mapping table can help avoid hash conflicts.
Batch Operations: For large amounts of data, consider using batch operations to improve efficiency. Qdrant's client supports batch upsert and search operations.
Error Handling: Always implement proper error handling for Qdrant operations, as network issues or service unavailability can occur.
Resource Management: Be mindful of memory usage when working with large vectors or datasets. Consider streaming or processing in batches if necessary.
Security: Ensure that your Qdrant instance is properly secured, especially if it's accessible over a network. Use authentication and encryption as needed.
Monitoring: Monitor your Qdrant instance's performance and resource usage to ensure optimal operation.
Testing: Test your vector operations thoroughly, especially the similarity search functionality, to ensure it returns relevant results.
Complete Example
Here's a complete example that demonstrates the entire workflow of using Qdrant in a plugin:
from typing import List, Dict, Any, Optional
import hashlib
from nekro_agent.api import core
from nekro_agent.api.plugin import NekroPlugin, ConfigBase
from nekro_agent.api.core import config as core_config, ModelConfigGroup
from nekro_agent.services.agent.openai import gen_openai_embeddings
from qdrant_client import QdrantClient, models
from pydantic import Field
class MyVectorPluginConfig(ConfigBase):
EMBEDDING_MODEL_GROUP: str = Field(
default="text-embedding",
title="Embedding Model Group",
description="Name of the model group used to generate text embedding vectors.",
json_schema_extra={"ref_model_groups": True, "required": True, "model_type": "embedding"},
)
EMBEDDING_DIMENSION: int = Field(
default=1024,
title="Embedding Vector Dimension",
description="Vector dimension output by the embedding model.",
)
# Initialize plugin
plugin = NekroPlugin(
name="MyVectorPlugin",
module_name="my_vector_plugin",
author="MyAuthor",
description="A plugin demonstrating vector database usage",
version="1.0.0"
)
@plugin.mount_init_method()
async def on_plugin_init():
"""Initialize plugin and create Qdrant collection if needed"""
config = plugin.get_config(MyVectorPluginConfig)
await init_vector_db_collection(plugin, config.EMBEDDING_DIMENSION)
async def init_vector_db_collection(plugin_instance: NekroPlugin, embedding_dimension: int):
"""Initialize the plugin's vector database collection"""
qdrant_client: Optional[QdrantClient] = await core.get_qdrant_client()
if not qdrant_client:
core.logger.warning(f"[{plugin_instance.name}] Unable to get Qdrant client, skipping vector database collection initialization")
return
collection_name = plugin_instance.get_vector_collection_name()
try:
collections_response = await qdrant_client.get_collections()
existing_collections = [col.name for col in collections_response.collections]
if collection_name not in existing_collections:
core.logger.info(f"[{plugin_instance.name}] Creating vector database collection: {collection_name}")
await qdrant_client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(
size=embedding_dimension,
distance=models.Distance.COSINE,
),
)
core.logger.success(f"[{plugin_instance.name}] Vector database collection {collection_name} created successfully")
else:
core.logger.info(f"[{plugin_instance.name}] Vector database collection {collection_name} already exists")
except Exception as e:
core.logger.error(f"[{plugin_instance.name}] Failed to initialize vector database collection {collection_name}: {e}")
async def get_qdrant_client() -> QdrantClient:
"""Get Qdrant client instance"""
qdrant_client: Optional[QdrantClient] = await core.get_qdrant_client()
if not qdrant_client:
raise ConnectionError("Qdrant client is not available.")
return qdrant_client
async def generate_text_embedding(text_to_embed: str, plugin_config: MyVectorPluginConfig) -> List[float]:
"""Generate embedding vectors for text using the configured embedding model"""
try:
model_group_info: Optional[ModelConfigGroup] = core_config.get_model_group_info(plugin_config.EMBEDDING_MODEL_GROUP)
if not model_group_info:
raise ValueError(f"Embedding model group '{plugin_config.EMBEDDING_MODEL_GROUP}' not found.")
embedding_vector: List[float] = await gen_openai_embeddings(
model=model_group_info.CHAT_MODEL,
input=text_to_embed,
api_key=model_group_info.API_KEY,
base_url=model_group_info.BASE_URL,
dimensions=plugin_config.EMBEDDING_DIMENSION
)
vector_dimension = len(embedding_vector)
if vector_dimension != plugin_config.EMBEDDING_DIMENSION:
raise ValueError(f"Embedding dimension mismatch. Expected: {plugin_config.EMBEDDING_DIMENSION}, got: {vector_dimension}")
return embedding_vector
except Exception as e:
core.logger.error(f"Failed to generate text embedding: {e}")
raise
def convert_to_qdrant_id(original_id: str) -> int:
"""Convert original ID to Qdrant point ID"""
qdrant_point_id_hex = hashlib.md5(original_id.encode()).hexdigest()[:16]
return int(qdrant_point_id_hex, 16)
async def store_document(
doc_id: str,
title: str,
content: str,
metadata: Dict[str, Any] = None
):
"""Store a document in the vector database"""
config = plugin.get_config(MyVectorPluginConfig)
qdrant_client = await get_qdrant_client()
collection_name = plugin.get_vector_collection_name()
# Generate embedding for the document content
content_embedding = await generate_text_embedding(content, config)
# Prepare payload with document metadata
payload = {
"original_id": doc_id,
"title": title,
"content": content,
**(metadata or {})
}
# Store in Qdrant
await store_vector_data(
qdrant_client=qdrant_client,
collection_name=collection_name,
original_data_id=doc_id,
embedding_vector=content_embedding,
payload_data=payload
)
core.logger.info(f"Document '{title}' stored in vector database with ID: {doc_id}")
async def search_documents(query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""Search for documents similar to the query"""
config = plugin.get_config(MyVectorPluginConfig)
qdrant_client = await get_qdrant_client()
collection_name = plugin.get_vector_collection_name()
# Generate embedding for the query
query_embedding = await generate_text_embedding(query, config)
# Search for similar documents
search_results = await search_similar_vectors(
qdrant_client=qdrant_client,
collection_name=collection_name,
query_vector=query_embedding,
top_k=top_k
)
# Format results
results = []
for result in search_results:
if result.payload:
results.append({
"id": result.payload.get("original_id"),
"title": result.payload.get("title"),
"content": result.payload.get("content"),
"score": result.score
})
return results
# Example usage in a sandbox method
@plugin.mount_sandbox_method()
async def search_knowledge_base(query: str, _ctx: 'AgentCtx') -> str:
"""Search the knowledge base for information related to the query"""
try:
search_results = await search_documents(query, top_k=3)
if not search_results:
return "I couldn't find any relevant information in the knowledge base."
response = "I found the following relevant information:\n\n"
for i, result in enumerate(search_results, 1):
response += f"{i}. {result['title']} (Relevance: {result['score']:.2f})\n"
response += f" {result['content'][:200]}...\n\n"
return response
except Exception as e:
core.logger.error(f"Error searching knowledge base: {e}")
return "Sorry, I encountered an error while searching the knowledge base."This example demonstrates a complete workflow for using Qdrant in a plugin, including initialization, embedding generation, storing data, and searching for similar content.
