📋 需求背景:针对 OpenClaw 执行复杂任务时的轮次限制、长时间运行任务监控、任务进度可视化等需求,调研 AgentScope 等方案并设计集成架构。
OpenClaw 执行轮次限制
长时间运行任务监控
任务进度可视化
多 Agent 协同
| 目标 | 说明 | 优先级 |
|---|---|---|
| 智能任务分解 | 复杂任务自动分解为可执行子任务 | P0 |
| 进度可视化 | 实时展示任务执行进度 | P0 |
| 长时间运行支持 | 支持 30 分钟 + 任务执行 | P1 |
| 多 Agent 协同 | OpenClaw + AgentScope + Claude Code | P1 |
| 任务接管 | 支持暂停、恢复、重新连接 | P2 |
根据源码分析:
| 限制类型 | 配置项 | 默认值 | 说明 |
|---|---|---|---|
| 最大并发 | maxConcurrent | 4 | 同时运行的 Agent 数量 |
| 子 Agent 并发 | subagents.maxConcurrent | 8 | 子 Agent 最大并发数 |
| Agent 对话轮次 | agentToAgent.maxPingPongTurns | 5 | Agent 之间对话轮次 |
| 会话数限制 | maxSessions | - | 最大会话数量 |
| Token 限制 | - | 模型决定 | LLM 上下文窗口 |
| 超时时间 | timeout | 600s | 默认 10 分钟超时 |
javascript// OpenClaw 源码中的停止原因
const stopReason = payload.stopReason === "max_tokens" ? "max_tokens" : "end_turn";
常见停止原因:
max_tokens - 达到 token 限制end_turn - Agent 主动结束timeout - 超时error - 执行错误核心原因:
Token 累积
超时限制
设计理念
成本考虑
AgentScope 是阿里云开源的多 Agent 协作框架:
pip install agentscopepythonfrom agentscope import Agent, Message, Pipeline
# 1. Agent 定义
class TaskPlannerAgent(Agent):
def __init__(self, name, model_config):
super().__init__(name, model_config)
def reply(self, message: Message) -> Message:
# 任务分解逻辑
plan = self.decompose_task(message.content)
return Message(
name=self.name,
content=plan,
metadata={"task_type": "planning"}
)
# 2. 工作流编排
pipeline = Pipeline([
TaskPlannerAgent("planner", model_config),
CodeGeneratorAgent("coder", model_config),
TaskExecutorAgent("executor", openclaw_config)
])
# 3. 执行
result = pipeline.run("复杂任务描述")
┌─────────────────────────────────────────────────────────┐ │ 用户界面层 │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 任务输入 │ │ 进度展示 │ │ 结果查看 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ AgentScope 规划层 │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 任务分解 │ │ 策略生成 │ │ 进度跟踪 │ │ │ │ (Planner) │ │ (Strategy) │ │ (Tracker) │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ 执行引擎层 │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ OpenClaw │ │ Claude Code │ │ Qwen │ │ │ │ Agent │ │ Runner │ │ Code │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ 工具技能层 │ │ local-project-scanner | mysql-writer | oss-uploader │ └─────────────────────────────────────────────────────────┘
pythonclass TaskDecompositionStrategy:
"""任务分解策略"""
def decompose(self, task: str) -> List[SubTask]:
"""
将复杂任务分解为可执行的子任务
返回:
[
SubTask(
id="1",
name="需求分析",
agent="planner",
estimated_time=300, # 秒
dependencies=[]
),
SubTask(
id="2",
name="代码生成",
agent="claude-code",
estimated_time=900,
dependencies=["1"]
),
SubTask(
id="3",
name="测试验证",
agent="openclaw",
estimated_time=600,
dependencies=["2"]
)
]
"""
pass
typescriptinterface TaskProgress {
// 任务基本信息
taskId: string;
name: string;
description: string;
// 进度信息
status: 'pending' | 'planning' | 'executing' | 'completed' | 'failed';
overallProgress: number; // 0-100
// 子任务进度
subtasks: SubTaskProgress[];
// 执行详情
executionDetails: {
currentAgent: string;
currentStep: string;
elapsedTime: number; // 秒
estimatedRemaining: number; // 秒
toolCalls: number;
tokensUsed: number;
};
// 日志和输出
logs: TaskLog[];
artifacts: Artifact[];
}
interface SubTaskProgress {
id: string;
name: string;
status: 'pending' | 'running' | 'completed' | 'failed';
progress: number; // 0-100
agent: string;
startTime?: number;
endTime?: number;
result?: any;
error?: string;
}
tsx// 任务进度看板
<TaskProgressDashboard taskId="xxx">
{/* 总体进度条 */}
<OverallProgress progress={67} />
{/* 子任务列表 */}
<SubTaskList>
<SubTaskCard
name="需求分析"
status="completed"
progress={100}
/>
<SubTaskCard
name="代码生成"
status="running"
progress={45}
agent="claude-code"
elapsedTime="12:34"
/>
<SubTaskCard
name="测试验证"
status="pending"
progress={0}
/>
</SubTaskList>
{/* 实时日志 */}
<LiveLogStream />
{/* 生成的文件 */}
<GeneratedArtifacts />
</TaskProgressDashboard>
pythonclass ProgressTracker:
"""进度跟踪器"""
def __init__(self, task_id: str):
self.task_id = task_id
self.ws_clients = [] # WebSocket 客户端
async def update_progress(self, progress: TaskProgress):
"""更新进度并推送给所有客户端"""
# 保存到数据库
await self.db.save_progress(progress)
# 推送给 WebSocket 客户端
for client in self.ws_clients:
await client.send_json(progress.dict())
async def on_tool_call(self, tool_name: str, args: dict):
"""监听 tool call 事件"""
await self.update_progress({
"event": "tool_call",
"tool": tool_name,
"args": args,
"timestamp": time.time()
})
async def on_agent_message(self, agent: str, message: str):
"""监听 Agent 消息"""
await self.update_progress({
"event": "agent_message",
"agent": agent,
"message": message,
"timestamp": time.time()
})
pythonclass ClaudeCodeRunner:
"""Claude Code 运行器"""
def __init__(self, project_dir: str):
self.project_dir = project_dir
self.process = None
self.tmux_session = None
async def start(self, task: str) -> str:
"""
在 tmux 中启动 Claude Code
返回:session_id
"""
session_id = f"claude-{uuid.uuid4()}"
# 创建 tmux 会话
self.tmux_session = session_id
subprocess.run([
'tmux', 'new-session', '-d', '-s', session_id,
f'claude --project {self.project_dir}'
])
# 发送任务
await self.send_command(task)
return session_id
async def send_command(self, command: str):
"""发送命令到 Claude Code"""
subprocess.run([
'tmux', 'send-keys', '-t', self.tmux_session,
command, 'Enter'
])
async def get_output(self, lines: int = 50) -> List[str]:
"""获取输出"""
result = subprocess.run([
'tmux', 'capture-pane', '-t', self.tmux_session,
'-p', '-S', f'-{lines}'
], capture_output=True, text=True)
return result.stdout.split('\n')
async def get_status(self) -> dict:
"""获取任务状态"""
output = await self.get_output()
# 分析输出,判断状态
return {
"status": self._parse_status(output),
"current_file": self._parse_current_file(output),
"progress": self._estimate_progress(output)
}
async def reconnect(self, session_id: str):
"""重新连接到已有会话"""
self.tmux_session = session_id
# 验证会话是否存在
result = subprocess.run([
'tmux', 'has-session', '-t', session_id
])
if result.returncode != 0:
raise Exception(f"Session {session_id} not found")
pythonCLAUDE_SHORTCUTS = {
# 基本控制
"pause": "Ctrl+C", # 暂停当前操作
"resume": "Enter", # 继续
"quit": "Ctrl+D", # 退出
# 模型切换
"switch_model": "/model", # 切换模型
"model_claude": "/model claude-sonnet",
"model_opus": "/model claude-opus",
# 信息获取
"help": "/help", # 帮助
"stats": "/stats", # 统计信息
"config": "/config", # 配置
# 文件操作
"add_file": "/add <file>", # 添加文件
"remove_file": "/remove <file>",
# 其他
"clear": "/clear", # 清屏
"exit": "/exit" # 退出
}
pythonclass HybridExecutor:
"""混合执行器:OpenClaw + Claude Code"""
def __init__(self):
self.openclaw = OpenClawAgent()
self.claude = ClaudeCodeRunner()
self.tracker = ProgressTracker()
async def execute_complex_task(self, task: str):
"""执行复杂任务"""
# 步骤 1: 使用 OpenClaw 规划
await self.tracker.update_status("planning")
plan = await self.openclaw.run(
f"分析以下任务并生成详细执行计划:{task}"
)
# 步骤 2: 分解任务
subtasks = self._decompose_plan(plan)
# 步骤 3: 执行子任务
for i, subtask in enumerate(subtasks):
await self.tracker.update_subtask(i, "running")
if subtask.type == "code_generation":
# 使用 Claude Code 生成代码
await self.claude.start(subtask.description)
# 定期获取进度
while not self._is_complete(await self.claude.get_status()):
status = await self.claude.get_status()
await self.tracker.update_progress({
"subtask": i,
"status": status
})
await asyncio.sleep(30) # 每 30 秒更新一次
# 获取生成的代码
code = await self.claude.get_output()
elif subtask.type == "research":
# 使用 OpenClaw 调研
result = await self.openclaw.run(subtask.description)
elif subtask.type == "testing":
# 使用 OpenClaw 测试
result = await self.openclaw.run(subtask.description)
await self.tracker.update_subtask(i, "completed")
# 步骤 4: 生成报告
report = await self.openclaw.run("生成任务完成报告")
return report
┌─────────────────────────────────────────────────────────────────┐ │ 用户界面层 │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ React 前端 (任务看板) │ │ │ │ - 任务输入 - 进度展示 - 日志流 - 结果查看 │ │ │ └─────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ WebSocket ▼ ┌─────────────────────────────────────────────────────────────────┐ │ API 网关层 │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ REST API │ │ WebSocket │ │ GraphQL │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 任务调度层 │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ AgentScope 任务规划引擎 │ │ │ │ - 任务分解 - 策略生成 - Agent 路由 - 进度跟踪 │ │ │ └─────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ ┌───────────────┼───────────────┐ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ OpenClaw │ │ Claude Code │ │ Qwen Code │ │ Agent │ │ Runner │ │ Runner │ │ │ │ │ │ │ │ - 工具调用 │ │ - 代码生成 │ │ - 代码生成 │ │ - 技能执行 │ │ - 文件修改 │ │ - 代码审查 │ │ - 会话管理 │ │ - 长时间运行 │ │ - 快速响应 │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 数据存储层 │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ MySQL │ │ Redis │ │ OSS │ │ │ │ 任务/进度 │ │ 缓存 │ │ 文件/报告 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────────────┘
python# 任务模型
class Task(BaseModel):
id: str
name: str
description: str
status: TaskStatus
priority: int = 1
# 执行配置
agent_strategy: AgentStrategy # 执行策略
max_duration: int = 3600 # 最大执行时间(秒)
auto_retry: bool = True
# 进度跟踪
progress: int = 0
current_subtask: Optional[str] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
# 执行历史
execution_log: List[ExecutionLog] = []
artifacts: List[Artifact] = []
# 子任务模型
class SubTask(BaseModel):
id: str
parent_task_id: str
name: str
description: str
# 执行配置
agent: str # openclaw | claude-code | qwen-code
estimated_duration: int # 预计执行时间(秒)
dependencies: List[str] = [] # 依赖的子任务 ID
# 执行状态
status: SubTaskStatus = 'pending'
progress: int = 0
result: Optional[Any] = None
error: Optional[str] = None
# 时间戳
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
# 执行日志
class ExecutionLog(BaseModel):
timestamp: datetime
event_type: str # tool_call | agent_message | status_change | error
agent: str
content: str
metadata: dict = {}
python# REST API
@app.post("/api/v1/tasks")
async def create_task(task: TaskCreateRequest) -> TaskResponse:
"""创建任务"""
pass
@app.get("/api/v1/tasks/{task_id}")
async def get_task(task_id: str) -> TaskResponse:
"""获取任务详情"""
pass
@app.get("/api/v1/tasks/{task_id}/progress")
async def get_task_progress(task_id: str) -> ProgressResponse:
"""获取任务进度"""
pass
@app.post("/api/v1/tasks/{task_id}/pause")
async def pause_task(task_id: str) -> TaskResponse:
"""暂停任务"""
pass
@app.post("/api/v1/tasks/{task_id}/resume")
async def resume_task(task_id: str) -> TaskResponse:
"""恢复任务"""
pass
@app.delete("/api/v1/tasks/{task_id}")
async def cancel_task(task_id: str) -> TaskResponse:
"""取消任务"""
pass
# WebSocket
@app.websocket("/ws/tasks/{task_id}")
async def task_progress_websocket(websocket: WebSocket, task_id: str):
"""任务进度实时推送"""
await websocket.accept()
tracker = ProgressTracker(task_id)
tracker.ws_clients.append(websocket)
try:
while True:
# 保持连接
await websocket.receive_text()
except:
tracker.ws_clients.remove(websocket)
| 对比项 | AgentScope | 原生 OpenClaw |
|---|---|---|
| 任务分解 | ✅ 内置支持 | ❌ 需自定义 |
| 多 Agent 协同 | ✅ 原生支持 | ⚠️ 有限支持 |
| 工作流编排 | ✅ Pipeline | ❌ 无 |
| 进度跟踪 | ✅ 内置 | ❌ 需自定义 |
| 学习曲线 | 中 | 低 |
| 事件类型 | 更新频率 | 说明 |
|---|---|---|
| Tool Call | 实时 | 每次调用立即更新 |
| Agent 消息 | 实时 | 每条消息更新 |
| 进度百分比 | 每 30 秒 | 避免过于频繁 |
| 长时间任务 | 每 60 秒 | Claude Code 等 |
本文档为初步设计方案,具体实施时可能需要调整 创建时间:2026-03-23
本文作者:lazyyoun
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!