Data Storage
Plugin development often requires persisting data, such as user preferences, session states, cache information, configuration files, model files, etc. Nekro Agent provides two complementary data storage methods for plugins to meet different storage needs.
Storage Methods Overview
The Nekro Agent plugin system provides two data storage methods:
1. KV Key-Value Storage (plugin.store)
- Storage Type: Database-backed key-value pair storage
- Data Format: String type (complex data needs serialization)
- Use Cases: Small structured data, configuration items, status information
- Access Method: Asynchronous access through
plugin.storeAPI - Data Scope: Supports three scopes: session-level, user-level, and plugin-global
2. Plugin Persistent Directory (plugin.get_plugin_path())
- Storage Type: File system directory
- Data Format: Any files and binary data
- Use Cases: Large files, binary data, model files, resource files
- Access Method: File system operations through
pathlib.Path - Data Scope: Plugin-exclusive directory, requires self-management of subdirectory structure
Selection Guide
| Storage Needs | Recommended Method | Reason |
|---|---|---|
| User Preferences | KV Storage | Small structured data, supports scope isolation |
| Session State | KV Storage | Requires session-level data isolation |
| Configuration Cache | KV Storage | Fast read/write, easy to query and update |
| Images, Audio, Video | Persistent Directory | Large binary files |
| Machine Learning Models | Persistent Directory | Large files, not suitable for database storage |
| Log Files | Persistent Directory | Continuous appending, file operations more efficient |
| Temporary Files | Persistent Directory | File system operations more flexible |
| Dataset Files | Persistent Directory | Large amounts of data, may require stream processing |
Method 1: KV Key-Value Storage
Overview
plugin.store provides a set of asynchronous methods to operate on key-value pair data stored in the database. Its main features include:
- Key-Value Storage: Simple and intuitive KV storage model
- Data Isolation: Each plugin has an independent namespace to avoid key name conflicts
- Scoped Data: Supports three data scopes
- Session-specific Data (
chat_key): Data is bound to a specific chat session - User-specific Data (
user_key): Data is bound to a specific user (cross-session) - Plugin-global Data (no key): Data belongs to the plugin itself, not associated with any specific session or user
- Session-specific Data (
- String Storage: Underlying storage is string-based, complex data needs serialization
Core API
1. Setting Data (set)
Add or update key-value pairs in storage.
async def set(
self,
chat_key: str = "", # Optional, session identifier
user_key: str = "", # Optional, user identifier
store_key: str = "", # Required, storage key name
value: str = "" # Required, value to store (string)
) -> Literal[0, 1]: # Returns 1 for success, 0 for failureExample:
from nekro_agent.api.schemas import AgentCtx
from nekro_agent.api import core
import json
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "save_preference", "Save user preference")
async def save_preference(_ctx: AgentCtx, key: str, value: str) -> str:
"""Save user preference settings"""
# Store session-specific data
await plugin.store.set(
chat_key=_ctx.from_chat_key,
store_key="last_command",
value="/weather London"
)
# Store user-specific preferences (cross-session)
user_prefs = {"theme": "dark", "notifications": True}
await plugin.store.set(
user_key=_ctx.from_user_id,
store_key="preferences",
value=json.dumps(user_prefs)
)
# Store plugin-global configuration
await plugin.store.set(
store_key="plugin_last_updated",
value=str(time.time())
)
return "Preferences saved"2. Getting Data (get)
Retrieve data from storage based on key name.
async def get(
self,
chat_key: str = "", # Optional, session identifier
user_key: str = "", # Optional, user identifier
store_key: str = "" # Required, storage key name
) -> Optional[str]: # Returns stored string value, None if not existsExample:
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "get_preference", "Get user preference")
async def get_preference(_ctx: AgentCtx, key: str) -> str:
"""Get user preference settings"""
# Get session-specific data
last_command = await plugin.store.get(
chat_key=_ctx.from_chat_key,
store_key="last_command"
)
# Get user preferences, use default value if not exists
prefs_str = await plugin.store.get(
user_key=_ctx.from_user_id,
store_key="preferences"
)
if prefs_str:
user_preferences = json.loads(prefs_str)
else:
# Default value
user_preferences = {"theme": "light", "notifications": False}
# Get plugin-global data
timestamp_str = await plugin.store.get(store_key="plugin_last_updated")
last_updated = float(timestamp_str) if timestamp_str else None
return f"Preferences: {user_preferences}"3. Deleting Data (delete)
Remove key-value pairs from storage based on key name.
async def delete(
self,
chat_key: str = "", # Optional, session identifier
user_key: str = "", # Optional, user identifier
store_key: str = "" # Required, storage key name
) -> Literal[0, 1]: # Returns 1 for success, 0 for failureExample:
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "clear_cache", "Clear cache")
async def clear_cache(_ctx: AgentCtx) -> str:
"""Clear session cache data"""
# Delete specific cache for the session
await plugin.store.delete(
chat_key=_ctx.from_chat_key,
store_key="session_cache_data"
)
# Delete a user's setting
await plugin.store.delete(
user_key=_ctx.from_user_id,
store_key="old_setting"
)
return "Cache cleared"4. Checking if Data Exists
Determine if a key exists by the return value of the get method:
value = await plugin.store.get(chat_key=_ctx.from_chat_key, store_key="my_key")
if value is not None:
core.logger.info("'my_key' exists in storage")
else:
core.logger.info("'my_key' does not exist")Storing Structured Data
Since KV storage only supports strings, complex data structures need to be serialized when stored. Using Pydantic models is recommended:
from pydantic import BaseModel
from typing import List, Dict, Optional
import time
class Note(BaseModel):
id: str
title: str
content: str
created_at: float
tags: List[str] = []
class UserNotes(BaseModel):
notes: Dict[str, Note] = {}
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "add_note", "Add note")
async def add_note(_ctx: AgentCtx, note_id: str, title: str, content: str, tags_str: str = "") -> str:
"""Add a note for the current user"""
# 1. Get existing note data
user_notes_json = await plugin.store.get(
user_key=_ctx.from_user_id,
store_key="all_notes"
)
if user_notes_json:
user_notes_data = UserNotes.model_validate_json(user_notes_json)
else:
user_notes_data = UserNotes()
# 2. Create new note
new_note = Note(
id=note_id,
title=title,
content=content,
created_at=time.time(),
tags=tags_str.split(',') if tags_str else []
)
user_notes_data.notes[note_id] = new_note
# 3. Serialize and store
await plugin.store.set(
user_key=_ctx.from_user_id,
store_key="all_notes",
value=user_notes_data.model_dump_json()
)
return f"Note '{title}' added, ID: {note_id}"
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "get_note", "Get note")
async def get_note(_ctx: AgentCtx, note_id: str) -> str:
"""Get note content for specified ID"""
user_notes_json = await plugin.store.get(
user_key=_ctx.from_user_id,
store_key="all_notes"
)
if not user_notes_json:
return "User has no notes"
user_notes_data = UserNotes.model_validate_json(user_notes_json)
note = user_notes_data.notes.get(note_id)
if note:
return f"Title: {note.title}\nContent: {note.content}\nTags: {', '.join(note.tags)}"
return f"Note with ID '{note_id}' not found"Method 2: Plugin Persistent Directory
Overview
plugin.get_plugin_path() returns a pathlib.Path object pointing to the plugin's exclusive file system directory. Each plugin has an independent directory, typically located at:
data/plugins/<plugin_author>.<plugin_module_name>/This method is suitable for storing large files, binary data, and scenarios requiring complex file structures.
Getting Plugin Directory
from pathlib import Path
from nekro_agent.api import core
# Get plugin data directory
plugin_dir: Path = plugin.get_plugin_path()
core.logger.info(f"Plugin data directory: {plugin_dir}")
# Example output: data/plugins/my_author.my_plugin/Basic File Operations
Example 1: Saving Text Files
import aiofiles
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "save_log", "Save log")
async def save_log(_ctx: AgentCtx, log_content: str) -> str:
"""Save log to file"""
# Get plugin directory
plugin_dir = plugin.get_plugin_path()
# Create log subdirectory
logs_dir = plugin_dir / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)
# Save log file
log_file = logs_dir / f"log_{int(time.time())}.txt"
async with aiofiles.open(log_file, "w", encoding="utf-8") as f:
await f.write(log_content)
core.logger.info(f"Log saved: {log_file}")
return f"Log saved to {log_file.name}"Example 2: Saving Binary Files
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "save_image", "Save image")
async def save_image(_ctx: AgentCtx, image_data: bytes, filename: str) -> str:
"""Save image file"""
plugin_dir = plugin.get_plugin_path()
images_dir = plugin_dir / "images"
images_dir.mkdir(parents=True, exist_ok=True)
image_path = images_dir / filename
async with aiofiles.open(image_path, "wb") as f:
await f.write(image_data)
return f"Image saved: {image_path}"Example 3: Reading Files
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "read_config", "Read configuration file")
async def read_config(_ctx: AgentCtx) -> str:
"""Read plugin configuration file"""
plugin_dir = plugin.get_plugin_path()
config_file = plugin_dir / "config.json"
# Check if file exists
if not config_file.exists():
return "Configuration file does not exist"
# Read file
async with aiofiles.open(config_file, "r", encoding="utf-8") as f:
config_content = await f.read()
return f"Configuration content: {config_content}"Advanced File Operation Examples
Example 4: Managing Resource Files
from typing import List
@plugin.mount_init_method()
async def init_plugin_resources():
"""Prepare resource directory structure when plugin initializes"""
plugin_dir = plugin.get_plugin_path()
# Create multiple subdirectories
directories = [
plugin_dir / "cache",
plugin_dir / "models",
plugin_dir / "exports",
plugin_dir / "temp",
plugin_dir / "user_uploads"
]
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
core.logger.info(f"Created directory: {directory}")
# Create default configuration file (if not exists)
default_config = plugin_dir / "config.json"
if not default_config.exists():
default_settings = {
"version": "1.0.0",
"enabled": True,
"cache_ttl": 3600
}
async with aiofiles.open(default_config, "w") as f:
await f.write(json.dumps(default_settings, indent=2))
core.logger.success(f"Created default configuration file: {default_config}")
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "list_exports", "List export files")
async def list_exports(_ctx: AgentCtx) -> str:
"""List all exported files"""
plugin_dir = plugin.get_plugin_path()
exports_dir = plugin_dir / "exports"
if not exports_dir.exists():
return "Export directory does not exist"
# List files in directory
files = [f.name for f in exports_dir.iterdir() if f.is_file()]
if not files:
return "No export files available"
return f"Export file list:\n" + "\n".join(f"- {f}" for f in files)Example 5: Downloading and Caching External Resources
import hashlib
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "cache_resource", "Cache external resource")
async def cache_external_resource(_ctx: AgentCtx, url: str) -> str:
"""Download and cache external resources"""
# Dynamically import requests
from nekro_agent.api.plugin import dynamic_import_pkg
requests = dynamic_import_pkg("requests>=2.25.0")
plugin_dir = plugin.get_plugin_path()
cache_dir = plugin_dir / "cache"
cache_dir.mkdir(parents=True, exist_ok=True)
# Use URL hash as filename
url_hash = hashlib.md5(url.encode()).hexdigest()
cache_file = cache_dir / f"{url_hash}.cache"
# Check if cache exists
if cache_file.exists():
core.logger.info(f"Using cache file: {cache_file}")
async with aiofiles.open(cache_file, "rb") as f:
cached_data = await f.read()
return f"Loaded from cache, size: {len(cached_data)} bytes"
# Download resource
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
content = response.content
# Save to cache
async with aiofiles.open(cache_file, "wb") as f:
await f.write(content)
core.logger.success(f"Resource downloaded and cached: {cache_file}")
return f"Resource downloaded, size: {len(content)} bytes"
except Exception as e:
core.logger.error(f"Failed to download resource: {e}")
return f"Download failed: {e}"Example 6: Cleaning Up Temporary Files
import time
import os
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "cleanup_temp", "Clean up temporary files")
async def cleanup_temp_files(_ctx: AgentCtx, max_age_hours: int = 24) -> str:
"""Clean up temporary files older than specified time"""
plugin_dir = plugin.get_plugin_path()
temp_dir = plugin_dir / "temp"
if not temp_dir.exists():
return "Temporary directory does not exist"
now = time.time()
max_age_seconds = max_age_hours * 3600
deleted_count = 0
# Iterate through temporary directory
for file_path in temp_dir.iterdir():
if file_path.is_file():
# Check file modification time
file_age = now - file_path.stat().st_mtime
if file_age > max_age_seconds:
try:
os.remove(file_path)
deleted_count += 1
core.logger.info(f"Deleted expired temporary file: {file_path.name}")
except Exception as e:
core.logger.error(f"Failed to delete file: {e}")
return f"Cleanup completed, deleted {deleted_count} files"File and AI Sandbox Interaction
When you need to pass files from the plugin directory to AI or receive AI-generated files, you need to use the file system API:
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "process_user_file", "Process user uploaded file")
async def process_user_file(_ctx: AgentCtx, file_path: str) -> str:
"""Process user uploaded file and save to plugin directory"""
# 1. Get the real path of the file passed by AI
host_path = _ctx.fs.get_file(file_path)
# 2. Read file content
async with aiofiles.open(host_path, "rb") as f:
file_content = await f.read()
# 3. Save to plugin directory
plugin_dir = plugin.get_plugin_path()
uploads_dir = plugin_dir / "user_uploads"
uploads_dir.mkdir(parents=True, exist_ok=True)
# Use timestamp as unique filename
saved_file = uploads_dir / f"{int(time.time())}_{host_path.name}"
async with aiofiles.open(saved_file, "wb") as f:
await f.write(file_content)
core.logger.success(f"File saved to plugin directory: {saved_file}")
return f"File processed and saved, size: {len(file_content)} bytes"
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "export_data", "Export data file")
async def export_data(_ctx: AgentCtx, data_content: str, filename: str) -> str:
"""Export data file and return to AI"""
# 1. Save to plugin directory
plugin_dir = plugin.get_plugin_path()
exports_dir = plugin_dir / "exports"
exports_dir.mkdir(parents=True, exist_ok=True)
export_file = exports_dir / filename
async with aiofiles.open(export_file, "w", encoding="utf-8") as f:
await f.write(data_content)
# 2. Convert to AI-accessible sandbox path
sandbox_path = await _ctx.fs.mixed_forward_file(export_file, file_name=filename)
return f"Data exported: {sandbox_path}"For detailed file transfer mechanisms, please refer to the File Interaction chapter.
Combining Both Storage Methods
In practical applications, both storage methods are usually used in combination:
from pydantic import BaseModel
from typing import Optional
class ModelInfo(BaseModel):
"""Model information (stored in KV)"""
name: str
version: str
file_path: str # Path to actual model file (relative to plugin directory)
size_bytes: int
created_at: float
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "download_model", "Download model")
async def download_model(_ctx: AgentCtx, model_url: str, model_name: str) -> str:
"""Download machine learning model and save metadata"""
from nekro_agent.api.plugin import dynamic_import_pkg
requests = dynamic_import_pkg("requests")
# 1. Download model file to plugin directory
plugin_dir = plugin.get_plugin_path()
models_dir = plugin_dir / "models"
models_dir.mkdir(parents=True, exist_ok=True)
model_file = models_dir / f"{model_name}.model"
try:
response = requests.get(model_url, timeout=300)
response.raise_for_status()
model_data = response.content
# Save model file
async with aiofiles.open(model_file, "wb") as f:
await f.write(model_data)
# 2. Save model metadata to KV storage
model_info = ModelInfo(
name=model_name,
version="1.0.0",
file_path=f"models/{model_name}.model", # Relative path
size_bytes=len(model_data),
created_at=time.time()
)
await plugin.store.set(
store_key=f"model_info:{model_name}",
value=model_info.model_dump_json()
)
return f"Model '{model_name}' downloaded successfully, size: {len(model_data)} bytes"
except Exception as e:
return f"Download failed: {e}"
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "load_model", "Load model")
async def load_model(_ctx: AgentCtx, model_name: str) -> str:
"""Load downloaded model"""
# 1. Get model metadata from KV storage
model_info_json = await plugin.store.get(store_key=f"model_info:{model_name}")
if not model_info_json:
return f"Model '{model_name}' does not exist"
model_info = ModelInfo.model_validate_json(model_info_json)
# 2. Load model file from plugin directory
plugin_dir = plugin.get_plugin_path()
model_file = plugin_dir / model_info.file_path
if not model_file.exists():
return f"Model file does not exist: {model_file}"
# Read model file (example)
async with aiofiles.open(model_file, "rb") as f:
model_data = await f.read()
return f"Model loaded: {model_info.name} (version {model_info.version}, size {model_info.size_bytes} bytes)"Best Practices
KV Storage Best Practices
Clear Key Naming Strategy: Use clear, structured
store_keypython# ✅ Recommended "user_prefs:theme" "chat_state:topic" "cache:api:last_fetch" # ❌ Not recommended "data" "temp" "x"
