Skip to content

Webhookアクセスポイント(非推奨)

機能が非推奨

Webhook機能は非推奨としてマークされており、将来のバージョンで削除される予定です。外部HTTPアクセスポイントを実装するには、新しい動的ルーティング機能を使用してください。動的ルーティングはより強力で柔軟なWeb API開発機能を提供します。

移行推奨

Webhook機能を使用している場合は、できるだけ早く動的ルーティングに移行することをお勧めします:

  • Webhook → 動的ルーティングのRESTful APIエンドポイント
  • より良いリクエスト処理とレスポンス管理
  • 完全なFastAPI機能のサポート
  • より標準化されたAPI設計パターン

非推奨の理由

Webhook機能が非推奨となった主な理由:

  1. 機能の重複: 動的ルーティングはより完全なWeb API開発機能を提供します
  2. 設計上の制限: Webhookの設計は比較的単純で、複雑なAPI要件を満たすことができません
  3. 保守コスト: 動的ルーティングはHTTPアクセスポイントの実装を統一します

元の機能概要

Webhookを使用すると、Nekro Agentプラグインは外部システム(GitHub、GitLab、CI/CDツール、IoTデバイス、カスタムサービスなど)からリアルタイムのHTTPプッシュ通知を受信できます。これにより、プラグインは外部イベント(コードコミット、監視アラート、データ更新など)に応答し、エージェント内で対応する操作(メッセージ送信、データ更新、または他のプラグイン関数の呼び出しなど)をトリガーできます。

Webhookの基本

外部システムで特定のイベントが発生すると、事前に設定されたURL(プラグインが提供するWebhookアクセスポイント)にHTTPリクエスト(通常はPOSTリクエストですが、GETなどの他のメソッドもサポート)を送信します。プラグインのWebhookハンドラはこのリクエストを受信し、その内容(通常はJSONまたはフォームデータ)を解析し、対応するロジックを実行します。

Webhookの登録

プラグインは@plugin.mount_webhook_method()デコレータを介して非同期関数をWebhookハンドラとして登録します。このデコレータは、関数を特定のHTTPパスに関連付けます。

python
from fastapi import Request, HTTPException # HTTPリクエストを処理するためのFastAPI
from nekro_agent.api import core, message, context # Agent API
import hmac
import hashlib
import json

# プラグインインスタンスが定義されていると仮定

@plugin.mount_webhook_method(
    endpoint="/github", # プラグインのWebhookルートパスに対する相対エンドポイント
    name="GitHub Webhookハンドラ",
    description="GitHubからWebhookイベントを受信して処理します。"
)
async def handle_github_webhook(request: Request):
    """GitHub Webhookリクエストを処理します。

    署名を検証し、ペイロードを解析し、イベントタイプに基づいてアクションを実行します。
    """
    core.logger.info(f"プラグイン '{plugin.name}' がGitHub Webhookリクエストを受信しました。")

    # 1. セキュリティ検証(推奨)
    github_signature = request.headers.get("X-Hub-Signature-256")
    if not github_signature:
        core.logger.warning("WebhookリクエストにX-Hub-Signature-256ヘッダーがありません。")
        raise HTTPException(status_code=400, detail="署名がありません")

    # プラグイン設定からWebhookシークレットを取得
    webhook_secret = plugin.config.GITHUB_WEBHOOK_SECRET # 設定項目が存在すると仮定
    if not webhook_secret:
        core.logger.error("プラグイン設定でGitHub Webhookシークレットが設定されていません!")
        raise HTTPException(status_code=500, detail="Webhookシークレットが設定されていません")

    request_body = await request.body()
    if not verify_github_signature(request_body, webhook_secret, github_signature):
        core.logger.warning("Webhook署名検証に失敗しました!")
        raise HTTPException(status_code=403, detail="無効な署名")

    # 2. ペイロードを解析
    try:
        payload = await request.json() # または外部システムが送信するContent-Typeに応じてrequest.form()
    except json.JSONDecodeError:
        core.logger.warning("Webhookリクエストボディが有効なJSONではありません。")
        raise HTTPException(status_code=400, detail="無効なJSONペイロード")

    # 3. イベントを処理
    event_type = request.headers.get("X-GitHub-Event")
    core.logger.info(f"検証されたGitHubイベント: {event_type}")

    if event_type == "push":
        repo_name = payload.get("repository", {}).get("full_name", "不明なリポジトリ")
        pusher_name = payload.get("pusher", {}).get("name", "不明なプッシャー")
        commits = payload.get("commits", [])
        commit_count = len(commits)

        notification_message = (
            f"📦 コードプッシュイベント\n"
            f"リポジトリ: {repo_name}\n"
            f"プッシャー: {pusher_name}\n"
            f"コミット: {commit_count}\n"
        )
        if commits:
            latest_commit_message = commits[0].get("message", "コミットメッセージなし")
            notification_message += f"最新コミット: {latest_commit_message.splitlines()[0]}"

        # 通知用にプラグインでターゲットセッションが設定されていると仮定
        target_chat_key = plugin.config.NOTIFICATION_CHAT_KEY
        if target_chat_key:
            _ctx = await context.create_temp_ctx(target_chat_key) # メッセージ送信用の一時コンテキストを作成
            await message.send_text(target_chat_key, notification_message, _ctx)
            core.logger.info(f"GitHubプッシュ通知が{target_chat_key}に送信されました。")
        else:
            core.logger.warning("GitHub通知のターゲットセッション(NOTIFICATION_CHAT_KEY)が設定されていません。")

    elif event_type == "issues":
        # issuesイベントを処理...
        pass

    # 4. レスポンスを返す
    # 通常、Webhookハンドラはイベントが受信されたことを示す2xxレスポンスを迅速に返す必要があります。
    # 時間のかかる操作は非同期で処理する必要があります。
    return {"status": "success", "message": f"イベント '{event_type}' を受信し確認しました。"}


def verify_github_signature(payload_body: bytes, secret: str, signature_header: str) -> bool:
    """GitHub Webhook署名を検証"""
    if not signature_header.startswith("sha256="):
        return False
    expected_signature = signature_header[7:] # "sha256="プレフィックスを削除

    hashed_payload = hmac.new(secret.encode('utf-8'), payload_body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(hashed_payload, expected_signature)

デコレータパラメータ:

  • endpoint (str): Webhookのパスで、プラグインのWebhookルートURLに対する相対パスです。例えば、プラグイン名がmy_pluginで、Agentがhttp://localhost:8000で実行されており、ここでのendpoint"/github"の場合、完全なWebhook URLはhttp://localhost:8000/api/plugin/my_plugin/webhook/githubになります。
  • name (str, オプション): Webhookの人間が読める名前で、ログや管理インターフェースで使用されます。
  • description (str, オプション): Webhook機能の簡単な説明。

Webhookハンドラ (async def 関数):

  • 非同期関数である必要があります。
  • fastapi.Requestオブジェクトをパラメータとして受け取り、リクエストヘッダー、リクエストボディ、クエリパラメータなどを取得できます。
  • 通常、セキュリティ検証(署名チェック、ソースIPなど)が必要です。
  • リクエストボディ(JSON、フォームデータなど)を解析します。
  • 解析されたデータに基づいてビジネスロジックを実行します。
  • 迅速なレスポンス: Webhookハンドラは、外部システムにイベントが正常に受信されたことを通知するために、HTTPレスポンスをできるだけ迅速に返す必要があります(通常は200 OKまたは他の2xxステータスコード)。時間のかかる操作は、Webhookレスポンスをブロックして外部システムのタイムアウト再試行を引き起こさないように、バックグラウンドで非同期に実行する必要があります。
  • fastapi.HTTPExceptionを使用して標準的なHTTPエラーレスポンスを返すことができます。

Webhook URL構造

プラグインWebhookの完全なURL構造は通常次のとおりです:

{BASE_URL}/api/plugin/{plugin_author}.{plugin_module_name}/webhook{endpoint}

  • {BASE_URL}: Nekro AgentのベースURL(例: http://localhost:8080)。
  • {plugin_author}.{plugin_module_name}: プラグインの一意のキーで、作者とモジュール名で構成されます。
  • {endpoint}: @plugin.mount_webhook_method()で定義されたendpointパラメータで、/で始まる必要があります。

例えば、プラグイン作者がdevで、モジュール名がsample_webhookで、endpoint/myeventの場合、URLは次のようになります: http://localhost:8080/api/plugin/dev.sample_webhook/webhook/myevent

プラグインのREADME.mdまたは設定ドキュメントに、提供するすべてのWebhook URLを明確に記載することをお勧めします。

セキュリティ検証

Webhookアクセスポイントは公開インターネットに公開されているため、リクエストが信頼できるソースから来ることを確認し、悪意のある攻撃を防ぐためにセキュリティ検証を実装することが重要です。

一般的な検証方法:

  1. 署名検証(推奨):
    • 外部システムは共有シークレットを使用してリクエストボディ(ペイロード)にハッシュ署名(HMAC-SHA256など)を付けます。
    • 署名結果は特定のリクエストヘッダー(GitHubの場合はX-Hub-Signature-256、GitLabの場合はX-Gitlab-Tokenなど)を介して送信されます。
    • プラグイン側は同じシークレットとアルゴリズムを使用して署名を再計算し、リクエストヘッダーの署名と比較します。
    • 上記のverify_github_signature関数例に示されています。
    • シークレットはプラグインの設定(is_secretとしてマーク)に保存する必要があり、コードにハードコードしないでください。
  2. IPアドレスホワイトリスト:
    • 外部システムの送信元IPアドレスが固定されているか、既知の範囲内にある場合、プラグインまたはゲートウェイレベルでIPホワイトリストを設定できます。
    • ただし、IPアドレスは変更される可能性があり、保守コストが高いです。
  3. 認証トークン:
    • 外部システムにリクエストヘッダーまたはクエリパラメータで事前共有された認証トークンを含めるように要求します。
    • 比較的単純ですが、署名検証よりも安全性が低く、トークンが漏洩する可能性があります。

少なくとも1つの効果的なセキュリティ検証メカニズムを実装してください。

異なるHTTPメソッドの処理

デフォルトでは、WebhookハンドラはすべてのHTTPメソッドに応答できます。特定のメソッド(GETPUTDELETEなど)を処理する必要がある場合は、ハンドラ内でrequest.methodを確認できます:

python
@plugin.mount_webhook_method(endpoint="/resource", name="リソースハンドラ")
async def handle_resource(request: Request):
    if request.method == "GET":
        # GETリクエストを処理、例えばリソースステータスを返す
        return {"status": "available", "data": await get_resource_data()}
    elif request.method == "POST":
        # POSTリクエストを処理、例えば新しいリソースを作成
        payload = await request.json()
        resource_id = await create_resource(payload)
        return {"status": "created", "id": resource_id}, 201 # 201 Createdを返す
    elif request.method == "DELETE":
        # DELETEリクエストを処理、例えばリソースを削除
        # ...
        return {"status": "deleted"}
    else:
        raise HTTPException(status_code=405, detail="Method Not Allowed")

Webhookからセッションへの通知送信

Webhookの一般的な使用例は、Nekro Agent内の特定のユーザーやセッションに外部イベントを通知することです。

python
from nekro_agent.api import message, context

async def notify_chat(chat_key: str, notification_text: str):
    if not chat_key:
        core.logger.warning("Webhook通知を送信できません: ターゲットchat_keyが空です。")
        return

    try:
        # メッセージ送信用の一時AgentCtxを作成
        # 注: この一時コンテキストには完全なユーザーまたはセッション情報が含まれていない場合があります
        _ctx = await context.create_temp_ctx(chat_key=chat_key)

        await message.send_text(chat_key, notification_text, _ctx)
        core.logger.info(f"Webhook通知が{chat_key}に送信されました。")
    except Exception as e:
        core.logger.error(f"Webhook経由で{chat_key}に通知を送信できませんでした: {e}")

# Webhookハンドラで呼び出す:
# target_chat = plugin.config.TARGET_CHAT_FOR_ALERTS
# await notify_chat(target_chat, "緊急アラート: サーバーCPU使用率が90%を超えました!")

ターゲットセッションchat_keyは通常、プラグインの設定から取得する必要があり、ユーザーが通知を受け取るグループまたはプライベートチャットを指定できるようにします。

ファイルアップロードの処理

Webhookはファイルアップロードの受信にも使用できます。FastAPIはmultipart/form-dataリクエストを処理する機能を提供します。

python
from fastapi import File, UploadFile, Form

@plugin.mount_webhook_method(endpoint="/upload_report", name="レポートアップロード")
async def handle_report_upload(
    request: Request,
    report_file: UploadFile = File(...), # "report_file"はフォームフィールド名
    report_name: str = Form(...),      # "report_name"もフォームフィールド名
    target_chat: str = Form(...)
):
    core.logger.info(f"ファイルアップロードを受信: {report_file.filename} (タイプ: {report_file.content_type}), レポート名: {report_name}")

    # ファイルタイプ、サイズなどを検証(推奨)
    if not report_file.content_type.startswith("text/"):
        raise HTTPException(status_code=400, detail="テキストファイルアップロードのみサポートしています。")

    # ファイルコンテンツを読み取り
    contents = await report_file.read() # bytes
    report_text = contents.decode('utf-8')

    # ファイルコンテンツを処理、例えばプラグインディレクトリに保存またはユーザーに送信
    # plugin_data_path = plugin.get_plugin_path() / "uploads"
    # plugin_data_path.mkdir(exist_ok=True)
    # async with aiofiles.open(plugin_data_path / report_file.filename, "wb") as f:
    #     await f.write(contents)

    await notify_chat(target_chat, f"新しいレポート '{report_name}' ({report_file.filename})を受信しました。コンテンツ概要:\n{report_text[:200]}...")

    return {"status": "success", "filename": report_file.filename, "size": len(contents)}

非同期タスク処理(重要)

FastAPIはasync def Webhookハンドラを非同期に実行(await)しますが、これはハンドラ自体がサーバーの他の操作をブロックしないことを意味するだけです。HTTPクライアントは、ハンドラがすべての内部操作(await部分を含む)を完了するまで待機し、レスポンスを受信します。 したがって、Webhookハンドラに外部システムへのレスポンスを遅延させるべきではない時間のかかる操作(複数の外部API呼び出し、大規模なデータ処理、複雑な計算タスクなど)が含まれる場合、これらの特定の操作をバックグラウンドタスクで実行することを強くお勧めします。これにより、プラグインは外部システムにWebhookが受信されたことを迅速に確認し(例えば202 Acceptedを返す)、実際の処理はバックグラウンドで行われます。

Pythonのasyncio.create_task()または類似のメカニズム(プロジェクトに統合されている場合はCelery、RQなど)をこの目的に使用できます。

python
import asyncio

async def long_running_process(data: dict, chat_to_notify: str):
    core.logger.info(f"時間のかかるタスクを開始: {data.get('task_id')}")
    await asyncio.sleep(10) # 時間のかかる操作をシミュレート
    result_message = f"タスク {data.get('task_id')} の処理が完了しました。結果: XXXX"
    await notify_chat(chat_to_notify, result_message)
    core.logger.info(f"時間のかかるタスク {data.get('task_id')} が完了し、通知されました。")

@plugin.mount_webhook_method(endpoint="/start_job", name="長時間ジョブ開始")
async def start_job_webhook(request: Request):
    payload = await request.json()
    task_id = payload.get("task_id", "unknown_task")
    chat_key = payload.get("notify_chat_key") # ペイロードに通知ターゲットが含まれていると仮定

    if not chat_key:
        return {"status": "error", "message": "notify_chat_keyが提供されていません"}, 400

    # 完了を待たずにバックグラウンドタスクを作成して開始
    asyncio.create_task(long_running_process({"task_id": task_id, **payload}, chat_key))

    core.logger.info(f"タスク{task_id}のバックグラウンドハンドラを作成しました。")
    return {"status": "accepted", "task_id": task_id, "message": "ジョブを受信し、バックグラウンドで処理中です。"}, 202 # 202 Acceptedを返す

Webhookを合理的に使用することで、プラグインはNekro Agentが外部世界と接続する能力を大幅に拡張できます。