Skip to content

Webhook Access Points (Deprecated)

Feature Deprecated

Webhook functionality has been marked as deprecated and will be removed in future versions. Please use the new Dynamic Routing Feature to implement external HTTP access points. Dynamic routing provides more powerful and flexible Web API development capabilities.

Migration Recommendation

If you are using Webhook functionality, it is recommended to migrate to dynamic routing as soon as possible:

  • Webhook → RESTful API endpoints of dynamic routing
  • Better request handling and response management
  • Support for complete FastAPI functionality
  • More standardized API design patterns

Reasons for Deprecation

The main reasons for deprecating Webhook functionality:

  1. Feature Overlap: Dynamic routing provides more complete Web API development capabilities
  2. Design Limitations: Webhook design is relatively simple and cannot meet complex API requirements
  3. Maintenance Cost: Dynamic routing unifies the implementation of HTTP access points

Original Functionality Overview

Webhook allows your Nekro Agent plugin to receive real-time HTTP push notifications from external systems (such as GitHub, GitLab, CI/CD tools, IoT devices, custom services, etc.). This enables plugins to respond to external events, such as code commits, monitoring alerts, data updates, etc., and trigger corresponding operations within the Agent, such as sending messages, updating data, or calling other plugin functions.

Webhook Basics

When a specific event occurs in an external system, it sends an HTTP request (usually a POST request, but also supports other methods like GET) to a pre-configured URL (that is, the Webhook access point provided by the plugin). The plugin's Webhook handler receives this request, parses its content (usually JSON or form data), and then executes corresponding logic.

Registering Webhooks

Plugins register an asynchronous function as a Webhook handler through the @plugin.mount_webhook_method() decorator. This decorator associates the function with a specific HTTP path.

python
from fastapi import Request, HTTPException # FastAPI for handling HTTP requests
from nekro_agent.api import core, message, context # Agent API
import hmac
import hashlib
import json

# Assume plugin instance is defined

@plugin.mount_webhook_method(
    endpoint="/github", # Endpoint relative to the plugin's Webhook root path
    name="GitHub Webhook Handler",
    description="Receive and handle Webhook events from GitHub."
)
async def handle_github_webhook(request: Request):
    """Handle GitHub Webhook requests.

    Verify signature, parse payload, and take action based on event type.
    """
    core.logger.info(f"Plugin '{plugin.name}' received GitHub Webhook request.")

    # 1. Security verification (recommended)
    github_signature = request.headers.get("X-Hub-Signature-256")
    if not github_signature:
        core.logger.warning("Webhook request missing X-Hub-Signature-256 header.")
        raise HTTPException(status_code=400, detail="Missing signature")

    # Get Webhook secret from plugin configuration
    webhook_secret = plugin.config.GITHUB_WEBHOOK_SECRET # Assume configuration item exists
    if not webhook_secret:
        core.logger.error("GitHub Webhook secret not set in plugin configuration!")
        raise HTTPException(status_code=500, detail="Webhook secret not configured")

    request_body = await request.body()
    if not verify_github_signature(request_body, webhook_secret, github_signature):
        core.logger.warning("Webhook signature verification failed!")
        raise HTTPException(status_code=403, detail="Invalid signature")

    # 2. Parse Payload
    try:
        payload = await request.json() # Or request.form() depending on Content-Type sent by external system
    except json.JSONDecodeError:
        core.logger.warning("Webhook request body is not valid JSON.")
        raise HTTPException(status_code=400, detail="Invalid JSON payload")

    # 3. Handle event
    event_type = request.headers.get("X-GitHub-Event")
    core.logger.info(f"Verified GitHub event: {event_type}")

    if event_type == "push":
        repo_name = payload.get("repository", {}).get("full_name", "Unknown repository")
        pusher_name = payload.get("pusher", {}).get("name", "Unknown pusher")
        commits = payload.get("commits", [])
        commit_count = len(commits)

        notification_message = (
            f"📦 Code push event\n"
            f"Repository: {repo_name}\n"
            f"Pusher: {pusher_name}\n"
            f"Commits: {commit_count}\n"
        )
        if commits:
            latest_commit_message = commits[0].get("message", "No commit message")
            notification_message += f"Latest commit: {latest_commit_message.splitlines()[0]}"

        # Assume plugin configured target session for notifications
        target_chat_key = plugin.config.NOTIFICATION_CHAT_KEY
        if target_chat_key:
            _ctx = await context.create_temp_ctx(target_chat_key) # Create temporary context for sending messages
            await message.send_text(target_chat_key, notification_message, _ctx)
            core.logger.info(f"GitHub push notification sent to {target_chat_key}.")
        else:
            core.logger.warning("Target session for GitHub notifications (NOTIFICATION_CHAT_KEY) not configured.")

    elif event_type == "issues":
        # Handle issues events...
        pass

    # 4. Return response
    # Typically, Webhook handlers should quickly return a 2xx response indicating the event has been received.
    # Time-consuming operations should be handled asynchronously.
    return {"status": "success", "message": f"Event '{event_type}' received and acknowledged."}


def verify_github_signature(payload_body: bytes, secret: str, signature_header: str) -> bool:
    """Verify GitHub Webhook signature"""
    if not signature_header.startswith("sha256="):
        return False
    expected_signature = signature_header[7:] # Remove "sha256=" prefix

    hashed_payload = hmac.new(secret.encode('utf-8'), payload_body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(hashed_payload, expected_signature)

Decorator Parameters:

  • endpoint (str): The path of the Webhook, relative to the plugin's Webhook root URL. For example, if the plugin name is my_plugin, Agent is running on http://localhost:8000, and the endpoint here is "/github", then the complete Webhook URL will be http://localhost:8000/api/plugin/my_plugin/webhook/github.
  • name (str, optional): Human-readable name of the Webhook, used for logs or management interfaces.
  • description (str, optional): Brief description of the Webhook functionality.

Webhook Handler (async def function):

  • Must be an asynchronous function.
  • Receives a fastapi.Request object as a parameter, from which request headers, request body, query parameters, etc. can be obtained.
  • Usually requires security verification (such as checking signatures, source IP, etc.).
  • Parse the request body (such as JSON, form data).
  • Execute business logic based on the parsed data.
  • Quick Response: Webhook handlers should return HTTP responses as quickly as possible (usually 200 OK or other 2xx status codes) to inform the external system that the event has been successfully received. Any time-consuming operations should be executed asynchronously in the background to avoid blocking the Webhook response causing external system timeout retries.
  • Can use fastapi.HTTPException to return standard HTTP error responses.

Webhook URL Structure

The complete URL structure of plugin Webhooks is typically as follows:

{BASE_URL}/api/plugin/{plugin_author}.{plugin_module_name}/webhook{endpoint}

  • {BASE_URL}: Base URL of Nekro Agent (e.g., http://localhost:8080).
  • {plugin_author}.{plugin_module_name}: Unique key of the plugin, composed of author and module name.
  • {endpoint}: The endpoint parameter defined in @plugin.mount_webhook_method(), must start with /.

For example, if the plugin author is dev, module name is sample_webhook, and endpoint is /myevent, then the URL might be: http://localhost:8080/api/plugin/dev.sample_webhook/webhook/myevent

It is recommended to clearly list all provided Webhook URLs in the plugin's README.md or configuration documentation.

Security Verification

Since Webhook access points are exposed to the public internet, implementing security verification is crucial to ensure requests come from trusted sources and prevent malicious attacks.

Common verification methods:

  1. Signature Verification (Recommended):
    • The external system uses a shared secret to hash sign the request body (Payload) (such as HMAC-SHA256).
    • The signature result is sent through specific request headers (such as X-Hub-Signature-256 for GitHub, X-Gitlab-Token for GitLab).
    • The plugin side uses the same secret and algorithm to recalculate the signature and compares it with the signature in the request header.
    • As shown in the verify_github_signature function example above.
    • The secret should be stored in the plugin's configuration (marked as is_secret), never hardcoded in the code.
  2. IP Address Whitelist:
    • If the external system's outbound IP address is fixed or within a known range, an IP whitelist can be configured at the plugin or gateway level.
    • However, IP addresses may change, and maintenance costs are high.
  3. Authentication Token:
    • Require the external system to carry a pre-shared authentication token in request headers or query parameters.
    • Relatively simple, but less secure than signature verification, as tokens may be leaked.

Be sure to implement at least one effective security verification mechanism.

Handling Different HTTP Methods

By default, Webhook handlers can respond to all HTTP methods. If you need to handle specific methods (such as GET, PUT, DELETE), you can check request.method inside the handler:

python
@plugin.mount_webhook_method(endpoint="/resource", name="Resource Handler")
async def handle_resource(request: Request):
    if request.method == "GET":
        # Handle GET request, for example, return resource status
        return {"status": "available", "data": await get_resource_data()}
    elif request.method == "POST":
        # Handle POST request, for example, create new resource
        payload = await request.json()
        resource_id = await create_resource(payload)
        return {"status": "created", "id": resource_id}, 201 # Return 201 Created
    elif request.method == "DELETE":
        # Handle DELETE request, for example, delete resource
        # ...
        return {"status": "deleted"}
    else:
        raise HTTPException(status_code=405, detail="Method Not Allowed")

Sending Notifications from Webhooks to Sessions

A common use case for Webhooks is to notify specific users or sessions within Nekro Agent about external events.

python
from nekro_agent.api import message, context

async def notify_chat(chat_key: str, notification_text: str):
    if not chat_key:
        core.logger.warning("Unable to send Webhook notification: target chat_key is empty.")
        return

    try:
        # Create a temporary AgentCtx for sending messages
        # Note: This temporary context may not contain complete user or session information, only used for message sending
        _ctx = await context.create_temp_ctx(chat_key=chat_key)

        await message.send_text(chat_key, notification_text, _ctx)
        core.logger.info(f"Webhook notification sent to {chat_key}.")
    except Exception as e:
        core.logger.error(f"Failed to send notification to {chat_key} via Webhook: {e}")

# Call in your Webhook handler:
# target_chat = plugin.config.TARGET_CHAT_FOR_ALERTS
# await notify_chat(target_chat, "Urgent alert: Server CPU usage exceeds 90%!")

The target session chat_key should usually be obtained from the plugin's configuration, allowing users to specify the group or private chat to receive notifications.

Handling File Uploads

Webhooks can also be used to receive file uploads. FastAPI provides the ability to handle multipart/form-data requests.

python
from fastapi import File, UploadFile, Form

@plugin.mount_webhook_method(endpoint="/upload_report", name="Upload Report")
async def handle_report_upload(
    request: Request,
    report_file: UploadFile = File(...), # "report_file" is the form field name
    report_name: str = Form(...),      # "report_name" is also the form field name
    target_chat: str = Form(...)
):
    core.logger.info(f"Received file upload: {report_file.filename} (type: {report_file.content_type}), report name: {report_name}")

    # Verify file type, size, etc. (recommended)
    if not report_file.content_type.startswith("text/"):
        raise HTTPException(status_code=400, detail="Only text file uploads are supported.")

    # Read file content
    contents = await report_file.read() # bytes
    report_text = contents.decode('utf-8')

    # Process file content, for example, save to plugin directory or send to user
    # plugin_data_path = plugin.get_plugin_path() / "uploads"
    # plugin_data_path.mkdir(exist_ok=True)
    # async with aiofiles.open(plugin_data_path / report_file.filename, "wb") as f:
    #     await f.write(contents)

    await notify_chat(target_chat, f"Received new report '{report_name}' ({report_file.filename}). Content summary:\n{report_text[:200]}...")

    return {"status": "success", "filename": report_file.filename, "size": len(contents)}

Asynchronous Task Processing (Important)

Although FastAPI will asynchronously execute (await) your async def Webhook handler, this only means the handler itself will not block other operations of the server. The HTTP client will still wait for the handler to complete all its internal operations (including the await parts) before receiving a response. Therefore, if the Webhook handler contains time-consuming operations that should not delay responses to external systems (such as calling multiple external APIs, processing large data, complex computational tasks), it is strongly recommended to put these specific operations into background tasks for execution. This ensures the plugin quickly confirms to the external system that the Webhook has been received (for example, returning 202 Accepted), while actual processing happens in the background.

Python's asyncio.create_task() or similar mechanisms (such as Celery, RQ, etc., if integrated in the project) can be used for this purpose.

python
import asyncio

async def long_running_process(data: dict, chat_to_notify: str):
    core.logger.info(f"Starting time-consuming task: {data.get('task_id')}")
    await asyncio.sleep(10) # Simulate time-consuming operation
    result_message = f"Task {data.get('task_id')} processing completed. Result is: XXXX"
    await notify_chat(chat_to_notify, result_message)
    core.logger.info(f"Time-consuming task {data.get('task_id')} completed and notified.")

@plugin.mount_webhook_method(endpoint="/start_job", name="Start Long Job")
async def start_job_webhook(request: Request):
    payload = await request.json()
    task_id = payload.get("task_id", "unknown_task")
    chat_key = payload.get("notify_chat_key") # Assume payload contains notification target

    if not chat_key:
        return {"status": "error", "message": "notify_chat_key not provided"}, 400

    # Create and start background task without waiting for its completion
    asyncio.create_task(long_running_process({"task_id": task_id, **payload}, chat_key))

    core.logger.info(f"Created background handler for task {task_id}.")
    return {"status": "accepted", "task_id": task_id, "message": "Job received and being processed in background."}, 202 # Return 202 Accepted

Through reasonable use of Webhooks, your plugin can greatly expand Nekro Agent's ability to connect with the outside world.