Skip to content

Data Storage

Plugin development often requires persisting data, such as user preferences, session states, cache information, configuration files, model files, etc. Nekro Agent provides two complementary data storage methods for plugins to meet different storage needs.

Storage Methods Overview

The Nekro Agent plugin system provides two data storage methods:

1. KV Key-Value Storage (plugin.store)

  • Storage Type: Database-backed key-value pair storage
  • Data Format: String type (complex data needs serialization)
  • Use Cases: Small structured data, configuration items, status information
  • Access Method: Asynchronous access through plugin.store API
  • Data Scope: Supports three scopes: session-level, user-level, and plugin-global

2. Plugin Persistent Directory (plugin.get_plugin_path())

  • Storage Type: File system directory
  • Data Format: Any files and binary data
  • Use Cases: Large files, binary data, model files, resource files
  • Access Method: File system operations through pathlib.Path
  • Data Scope: Plugin-exclusive directory, requires self-management of subdirectory structure

Selection Guide

Storage NeedsRecommended MethodReason
User PreferencesKV StorageSmall structured data, supports scope isolation
Session StateKV StorageRequires session-level data isolation
Configuration CacheKV StorageFast read/write, easy to query and update
Images, Audio, VideoPersistent DirectoryLarge binary files
Machine Learning ModelsPersistent DirectoryLarge files, not suitable for database storage
Log FilesPersistent DirectoryContinuous appending, file operations more efficient
Temporary FilesPersistent DirectoryFile system operations more flexible
Dataset FilesPersistent DirectoryLarge amounts of data, may require stream processing

Method 1: KV Key-Value Storage

Overview

plugin.store provides a set of asynchronous methods to operate on key-value pair data stored in the database. Its main features include:

  • Key-Value Storage: Simple and intuitive KV storage model
  • Data Isolation: Each plugin has an independent namespace to avoid key name conflicts
  • Scoped Data: Supports three data scopes
    • Session-specific Data (chat_key): Data is bound to a specific chat session
    • User-specific Data (user_key): Data is bound to a specific user (cross-session)
    • Plugin-global Data (no key): Data belongs to the plugin itself, not associated with any specific session or user
  • String Storage: Underlying storage is string-based, complex data needs serialization

Core API

1. Setting Data (set)

Add or update key-value pairs in storage.

python
async def set(
    self,
    chat_key: str = "",    # Optional, session identifier
    user_key: str = "",    # Optional, user identifier
    store_key: str = "",   # Required, storage key name
    value: str = ""        # Required, value to store (string)
) -> Literal[0, 1]:        # Returns 1 for success, 0 for failure

Example:

python
from nekro_agent.api.schemas import AgentCtx
from nekro_agent.api import core
import json

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "save_preference", "Save user preference")
async def save_preference(_ctx: AgentCtx, key: str, value: str) -> str:
    """Save user preference settings"""

    # Store session-specific data
    await plugin.store.set(
        chat_key=_ctx.from_chat_key,
        store_key="last_command",
        value="/weather London"
    )

    # Store user-specific preferences (cross-session)
    user_prefs = {"theme": "dark", "notifications": True}
    await plugin.store.set(
        user_key=_ctx.from_user_id,
        store_key="preferences",
        value=json.dumps(user_prefs)
    )

    # Store plugin-global configuration
    await plugin.store.set(
        store_key="plugin_last_updated",
        value=str(time.time())
    )

    return "Preferences saved"

2. Getting Data (get)

Retrieve data from storage based on key name.

python
async def get(
    self,
    chat_key: str = "",    # Optional, session identifier
    user_key: str = "",    # Optional, user identifier
    store_key: str = ""    # Required, storage key name
) -> Optional[str]:        # Returns stored string value, None if not exists

Example:

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "get_preference", "Get user preference")
async def get_preference(_ctx: AgentCtx, key: str) -> str:
    """Get user preference settings"""

    # Get session-specific data
    last_command = await plugin.store.get(
        chat_key=_ctx.from_chat_key,
        store_key="last_command"
    )

    # Get user preferences, use default value if not exists
    prefs_str = await plugin.store.get(
        user_key=_ctx.from_user_id,
        store_key="preferences"
    )

    if prefs_str:
        user_preferences = json.loads(prefs_str)
    else:
        # Default value
        user_preferences = {"theme": "light", "notifications": False}

    # Get plugin-global data
    timestamp_str = await plugin.store.get(store_key="plugin_last_updated")
    last_updated = float(timestamp_str) if timestamp_str else None

    return f"Preferences: {user_preferences}"

3. Deleting Data (delete)

Remove key-value pairs from storage based on key name.

python
async def delete(
    self,
    chat_key: str = "",    # Optional, session identifier
    user_key: str = "",    # Optional, user identifier
    store_key: str = ""    # Required, storage key name
) -> Literal[0, 1]:        # Returns 1 for success, 0 for failure

Example:

python
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "clear_cache", "Clear cache")
async def clear_cache(_ctx: AgentCtx) -> str:
    """Clear session cache data"""

    # Delete specific cache for the session
    await plugin.store.delete(
        chat_key=_ctx.from_chat_key,
        store_key="session_cache_data"
    )

    # Delete a user's setting
    await plugin.store.delete(
        user_key=_ctx.from_user_id,
        store_key="old_setting"
    )

    return "Cache cleared"

4. Checking if Data Exists

Determine if a key exists by the return value of the get method:

python
value = await plugin.store.get(chat_key=_ctx.from_chat_key, store_key="my_key")
if value is not None:
    core.logger.info("'my_key' exists in storage")
else:
    core.logger.info("'my_key' does not exist")

Storing Structured Data

Since KV storage only supports strings, complex data structures need to be serialized when stored. Using Pydantic models is recommended:

python
from pydantic import BaseModel
from typing import List, Dict, Optional
import time

class Note(BaseModel):
    id: str
    title: str
    content: str
    created_at: float
    tags: List[str] = []

class UserNotes(BaseModel):
    notes: Dict[str, Note] = {}

@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "add_note", "Add note")
async def add_note(_ctx: AgentCtx, note_id: str, title: str, content: str, tags_str: str = "") -> str:
    """Add a note for the current user"""

    # 1. Get existing note data
    user_notes_json = await plugin.store.get(
        user_key=_ctx.from_user_id,
        store_key="all_notes"
    )

    if user_notes_json:
        user_notes_data = UserNotes.model_validate_json(user_notes_json)
    else:
        user_notes_data = UserNotes()

    # 2. Create new note
    new_note = Note(
        id=note_id,
        title=title,
        content=content,
        created_at=time.time(),
        tags=tags_str.split(',') if tags_str else []
    )
    user_notes_data.notes[note_id] = new_note

    # 3. Serialize and store
    await plugin.store.set(
        user_key=_ctx.from_user_id,
        store_key="all_notes",
        value=user_notes_data.model_dump_json()
    )

    return f"Note '{title}' added, ID: {note_id}"

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "get_note", "Get note")
async def get_note(_ctx: AgentCtx, note_id: str) -> str:
    """Get note content for specified ID"""

    user_notes_json = await plugin.store.get(
        user_key=_ctx.from_user_id,
        store_key="all_notes"
    )

    if not user_notes_json:
        return "User has no notes"

    user_notes_data = UserNotes.model_validate_json(user_notes_json)
    note = user_notes_data.notes.get(note_id)

    if note:
        return f"Title: {note.title}\nContent: {note.content}\nTags: {', '.join(note.tags)}"

    return f"Note with ID '{note_id}' not found"

Method 2: Plugin Persistent Directory

Overview

plugin.get_plugin_path() returns a pathlib.Path object pointing to the plugin's exclusive file system directory. Each plugin has an independent directory, typically located at:

data/plugins/<plugin_author>.<plugin_module_name>/

This method is suitable for storing large files, binary data, and scenarios requiring complex file structures.

Getting Plugin Directory

python
from pathlib import Path
from nekro_agent.api import core

# Get plugin data directory
plugin_dir: Path = plugin.get_plugin_path()
core.logger.info(f"Plugin data directory: {plugin_dir}")

# Example output: data/plugins/my_author.my_plugin/

Basic File Operations

Example 1: Saving Text Files

python
import aiofiles

@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "save_log", "Save log")
async def save_log(_ctx: AgentCtx, log_content: str) -> str:
    """Save log to file"""

    # Get plugin directory
    plugin_dir = plugin.get_plugin_path()

    # Create log subdirectory
    logs_dir = plugin_dir / "logs"
    logs_dir.mkdir(parents=True, exist_ok=True)

    # Save log file
    log_file = logs_dir / f"log_{int(time.time())}.txt"
    async with aiofiles.open(log_file, "w", encoding="utf-8") as f:
        await f.write(log_content)

    core.logger.info(f"Log saved: {log_file}")
    return f"Log saved to {log_file.name}"

Example 2: Saving Binary Files

python
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "save_image", "Save image")
async def save_image(_ctx: AgentCtx, image_data: bytes, filename: str) -> str:
    """Save image file"""

    plugin_dir = plugin.get_plugin_path()
    images_dir = plugin_dir / "images"
    images_dir.mkdir(parents=True, exist_ok=True)

    image_path = images_dir / filename
    async with aiofiles.open(image_path, "wb") as f:
        await f.write(image_data)

    return f"Image saved: {image_path}"

Example 3: Reading Files

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "read_config", "Read configuration file")
async def read_config(_ctx: AgentCtx) -> str:
    """Read plugin configuration file"""

    plugin_dir = plugin.get_plugin_path()
    config_file = plugin_dir / "config.json"

    # Check if file exists
    if not config_file.exists():
        return "Configuration file does not exist"

    # Read file
    async with aiofiles.open(config_file, "r", encoding="utf-8") as f:
        config_content = await f.read()

    return f"Configuration content: {config_content}"

Advanced File Operation Examples

Example 4: Managing Resource Files

python
from typing import List

@plugin.mount_init_method()
async def init_plugin_resources():
    """Prepare resource directory structure when plugin initializes"""

    plugin_dir = plugin.get_plugin_path()

    # Create multiple subdirectories
    directories = [
        plugin_dir / "cache",
        plugin_dir / "models",
        plugin_dir / "exports",
        plugin_dir / "temp",
        plugin_dir / "user_uploads"
    ]

    for directory in directories:
        directory.mkdir(parents=True, exist_ok=True)
        core.logger.info(f"Created directory: {directory}")

    # Create default configuration file (if not exists)
    default_config = plugin_dir / "config.json"
    if not default_config.exists():
        default_settings = {
            "version": "1.0.0",
            "enabled": True,
            "cache_ttl": 3600
        }
        async with aiofiles.open(default_config, "w") as f:
            await f.write(json.dumps(default_settings, indent=2))
        core.logger.success(f"Created default configuration file: {default_config}")

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "list_exports", "List export files")
async def list_exports(_ctx: AgentCtx) -> str:
    """List all exported files"""

    plugin_dir = plugin.get_plugin_path()
    exports_dir = plugin_dir / "exports"

    if not exports_dir.exists():
        return "Export directory does not exist"

    # List files in directory
    files = [f.name for f in exports_dir.iterdir() if f.is_file()]

    if not files:
        return "No export files available"

    return f"Export file list:\n" + "\n".join(f"- {f}" for f in files)

Example 5: Downloading and Caching External Resources

python
import hashlib

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "cache_resource", "Cache external resource")
async def cache_external_resource(_ctx: AgentCtx, url: str) -> str:
    """Download and cache external resources"""

    # Dynamically import requests
    from nekro_agent.api.plugin import dynamic_import_pkg
    requests = dynamic_import_pkg("requests>=2.25.0")

    plugin_dir = plugin.get_plugin_path()
    cache_dir = plugin_dir / "cache"
    cache_dir.mkdir(parents=True, exist_ok=True)

    # Use URL hash as filename
    url_hash = hashlib.md5(url.encode()).hexdigest()
    cache_file = cache_dir / f"{url_hash}.cache"

    # Check if cache exists
    if cache_file.exists():
        core.logger.info(f"Using cache file: {cache_file}")
        async with aiofiles.open(cache_file, "rb") as f:
            cached_data = await f.read()
        return f"Loaded from cache, size: {len(cached_data)} bytes"

    # Download resource
    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        content = response.content

        # Save to cache
        async with aiofiles.open(cache_file, "wb") as f:
            await f.write(content)

        core.logger.success(f"Resource downloaded and cached: {cache_file}")
        return f"Resource downloaded, size: {len(content)} bytes"

    except Exception as e:
        core.logger.error(f"Failed to download resource: {e}")
        return f"Download failed: {e}"

Example 6: Cleaning Up Temporary Files

python
import time
import os

@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "cleanup_temp", "Clean up temporary files")
async def cleanup_temp_files(_ctx: AgentCtx, max_age_hours: int = 24) -> str:
    """Clean up temporary files older than specified time"""

    plugin_dir = plugin.get_plugin_path()
    temp_dir = plugin_dir / "temp"

    if not temp_dir.exists():
        return "Temporary directory does not exist"

    now = time.time()
    max_age_seconds = max_age_hours * 3600
    deleted_count = 0

    # Iterate through temporary directory
    for file_path in temp_dir.iterdir():
        if file_path.is_file():
            # Check file modification time
            file_age = now - file_path.stat().st_mtime

            if file_age > max_age_seconds:
                try:
                    os.remove(file_path)
                    deleted_count += 1
                    core.logger.info(f"Deleted expired temporary file: {file_path.name}")
                except Exception as e:
                    core.logger.error(f"Failed to delete file: {e}")

    return f"Cleanup completed, deleted {deleted_count} files"

File and AI Sandbox Interaction

When you need to pass files from the plugin directory to AI or receive AI-generated files, you need to use the file system API:

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "process_user_file", "Process user uploaded file")
async def process_user_file(_ctx: AgentCtx, file_path: str) -> str:
    """Process user uploaded file and save to plugin directory"""

    # 1. Get the real path of the file passed by AI
    host_path = _ctx.fs.get_file(file_path)

    # 2. Read file content
    async with aiofiles.open(host_path, "rb") as f:
        file_content = await f.read()

    # 3. Save to plugin directory
    plugin_dir = plugin.get_plugin_path()
    uploads_dir = plugin_dir / "user_uploads"
    uploads_dir.mkdir(parents=True, exist_ok=True)

    # Use timestamp as unique filename
    saved_file = uploads_dir / f"{int(time.time())}_{host_path.name}"
    async with aiofiles.open(saved_file, "wb") as f:
        await f.write(file_content)

    core.logger.success(f"File saved to plugin directory: {saved_file}")
    return f"File processed and saved, size: {len(file_content)} bytes"

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "export_data", "Export data file")
async def export_data(_ctx: AgentCtx, data_content: str, filename: str) -> str:
    """Export data file and return to AI"""

    # 1. Save to plugin directory
    plugin_dir = plugin.get_plugin_path()
    exports_dir = plugin_dir / "exports"
    exports_dir.mkdir(parents=True, exist_ok=True)

    export_file = exports_dir / filename
    async with aiofiles.open(export_file, "w", encoding="utf-8") as f:
        await f.write(data_content)

    # 2. Convert to AI-accessible sandbox path
    sandbox_path = await _ctx.fs.mixed_forward_file(export_file, file_name=filename)

    return f"Data exported: {sandbox_path}"

For detailed file transfer mechanisms, please refer to the File Interaction chapter.

Combining Both Storage Methods

In practical applications, both storage methods are usually used in combination:

python
from pydantic import BaseModel
from typing import Optional

class ModelInfo(BaseModel):
    """Model information (stored in KV)"""
    name: str
    version: str
    file_path: str  # Path to actual model file (relative to plugin directory)
    size_bytes: int
    created_at: float

@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "download_model", "Download model")
async def download_model(_ctx: AgentCtx, model_url: str, model_name: str) -> str:
    """Download machine learning model and save metadata"""

    from nekro_agent.api.plugin import dynamic_import_pkg
    requests = dynamic_import_pkg("requests")

    # 1. Download model file to plugin directory
    plugin_dir = plugin.get_plugin_path()
    models_dir = plugin_dir / "models"
    models_dir.mkdir(parents=True, exist_ok=True)

    model_file = models_dir / f"{model_name}.model"

    try:
        response = requests.get(model_url, timeout=300)
        response.raise_for_status()
        model_data = response.content

        # Save model file
        async with aiofiles.open(model_file, "wb") as f:
            await f.write(model_data)

        # 2. Save model metadata to KV storage
        model_info = ModelInfo(
            name=model_name,
            version="1.0.0",
            file_path=f"models/{model_name}.model",  # Relative path
            size_bytes=len(model_data),
            created_at=time.time()
        )

        await plugin.store.set(
            store_key=f"model_info:{model_name}",
            value=model_info.model_dump_json()
        )

        return f"Model '{model_name}' downloaded successfully, size: {len(model_data)} bytes"

    except Exception as e:
        return f"Download failed: {e}"

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "load_model", "Load model")
async def load_model(_ctx: AgentCtx, model_name: str) -> str:
    """Load downloaded model"""

    # 1. Get model metadata from KV storage
    model_info_json = await plugin.store.get(store_key=f"model_info:{model_name}")

    if not model_info_json:
        return f"Model '{model_name}' does not exist"

    model_info = ModelInfo.model_validate_json(model_info_json)

    # 2. Load model file from plugin directory
    plugin_dir = plugin.get_plugin_path()
    model_file = plugin_dir / model_info.file_path

    if not model_file.exists():
        return f"Model file does not exist: {model_file}"

    # Read model file (example)
    async with aiofiles.open(model_file, "rb") as f:
        model_data = await f.read()

    return f"Model loaded: {model_info.name} (version {model_info.version}, size {model_info.size_bytes} bytes)"

Best Practices

KV Storage Best Practices

  1. Clear Key Naming Strategy: Use clear, structured store_key

    python
    # ✅ Recommended
    "user_prefs:theme"
    "chat_state:topic"
    "cache:api:last_fetch"
    
    # ❌ Not recommended
    "data"
    "temp"
    "x"