MCP(Model Context Protocol)を用いたツール開発で、処理が長時間かかる場合には、環境によってはタイムアウトのリスクがあります。タイムアウト時間は環境や実装に依存し、一例として数十秒から数分程度で切断される場合があります。こうした制限を意識して、長時間処理の設計には2つの基本方針があります。
この方法では、一回のMCPツール呼び出し内で処理を完結させます。実装例として下記のmiddle_running_task
があります。
@mcp.tool()
async def middle_running_task(duration: int = 10, ctx: Context = None) -> str:
for i in range(duration):
if ctx:
await ctx.report_progress(progress=i+1, total=duration, message=f"{i+1}秒経過")
await asyncio.sleep(1)
return f"done {duration}秒"
ctx.report_progress
でクライアントに通知長時間の処理では、処理開始(リクエスト)と処理結果の通知(応答)を分離し、非同期で動かしながら進捗を別APIで取得する方法が推奨されます。例としてstart_long_task_with_progress
があります。
@mcp.tool()
def start_long_task_with_progress(duration: int = 10) -> str:
job_id = str(uuid.uuid4())
_tasks[job_id] = {
"status": "pending",
"progress": 0,
"total": duration,
"result": None,
"error": None
}
asyncio.create_task(_execute_task_with_progress(job_id, duration))
return job_id
非同期タスクで進捗を更新します。
async def _execute_task_with_progress(job_id, duration):
try:
_tasks[job_id]["status"] = "running"
for i in range(duration):
await asyncio.sleep(1)
_tasks[job_id]["progress"] = i + 1
_tasks[job_id]["result"] = f"completed: {duration} 秒処理"
_tasks[job_id]["status"] = "finished"
except Exception as e:
_tasks[job_id]["status"] = "failed"
_tasks[job_id]["error"] = str(e)
進捗と結果は専用リソースから取得できます。
@mcp.resource("result://{job_id}")
def get_result(job_id: str):
task = _tasks.get(job_id)
if not task:
return {"error": "not found"}
return {
"status": task["status"],
"progress": task["progress"],
"total": task["total"],
"result": task["result"],
"error": task["error"]
}
MCPInspectorでは、非同期処理のジョブIDから進捗の取得APIをポーリングし、リアルタイムに進捗や完了状態をGUIで確認できます。これにより長時間処理でもユーザーに状態を分かりやすく伝えられます。
進捗通知
完了
FastMCPを使用して作成していて、Toolが2個と、Resourceが2個登録されています。
from mcp.server.fastmcp import FastMCP, Context
import asyncio
import uuid
mcp = FastMCP("Hello World")
# ジョブストア全体
_tasks = {}
# ①元のmiddle_running_task(progressなし、残す)
@mcp.tool()
async def middle_running_task(duration: int = 10, ctx: Context = None) -> str:
for i in range(duration):
if ctx:
await ctx.report_progress(progress=i+1, total=duration, message=f"{i+1}秒経過")
await asyncio.sleep(1)
return f"done {duration}秒"
# ②途中経過含む新規長時間処理・実行開始ツール
@mcp.tool()
def start_long_task_with_progress(duration: int = 10) -> str:
job_id = str(uuid.uuid4())
_tasks[job_id] = {
"status": "pending",
"progress": 0,
"total": duration,
"result": None,
"error": None
}
asyncio.create_task(_execute_task_with_progress(job_id, duration))
return job_id
async def _execute_task_with_progress(job_id, duration):
try:
_tasks[job_id]["status"] = "running"
for i in range(duration):
await asyncio.sleep(1)
_tasks[job_id]["progress"] = i + 1
_tasks[job_id]["result"] = f"completed: {duration} 秒処理"
_tasks[job_id]["status"] = "finished"
except Exception as e:
_tasks[job_id]["status"] = "failed"
_tasks[job_id]["error"] = str(e)
# 途中経過含む結果取得リソース
@mcp.resource("result://{job_id}")
def get_result(job_id: str):
task = _tasks.get(job_id)
if not task:
return {"error": "not found"}
return {
"status": task["status"],
"progress": task["progress"],
"total": task["total"],
"result": task["result"],
"error": task["error"]
}
@mcp.resource("greeting://{name}")
def get_greeting(name: str) -> str:
return f"Hello, {name}!"
if __name__ == "__main__":
mcp.run(transport="stdio")
方法 | 特徴 | 利用シーン例 |
---|---|---|
1. 一回の要求で完結 | 実装が簡単だがタイムアウトリスクあり | 短時間の処理や簡易処理 |
2. 要求・結果分離 | タイムアウトリスク低減、進捗管理可能 | 長時間処理や重いバックグラウンド作業 |
長時間処理の実装は、環境のタイムアウト特性を理解し適した方針を選択することが重要です。非同期・分離型アプローチは安全で多くの現場で推奨されています。