编辑
2026-03-23
系统设计
00

目录

AgentScope 集成方案 - 复杂任务执行系统设计
🎯 一、AgentScope 核心能力
1.1 AgentScope 简介
1.2 核心概念
1.3 内置 Agent 角色
🏗️ 二、集成架构设计
2.1 整体架构
2.2 Agent 角色设计
🔌 三、OpenClaw 集成实现
3.1 OpenClaw 工具封装
3.2 工具注册到 AgentScope
📊 四、任务进度跟踪
4.1 进度数据结构
4.2 进度跟踪器实现
4.3 实时日志流
🎨 五、前端页面设计
5.1 任务看板页面
5.2 进度条组件
5.3 子任务卡片
🔄 六、长时间运行任务支持
6.1 Claude Code tmux 集成
6.2 快捷键映射
📋 七、实施计划
7.1 阶段一:基础框架(1-2 周)
7.2 阶段二:任务规划(1 周)
7.3 阶段三:执行集成(1-2 周)
7.4 阶段四:前端看板(2 周)
7.5 阶段五:优化完善(1 周)
📚 八、参考资料

title: AgentScope 集成方案 - 复杂任务执行系统设计 date: 2026-03-23 tags: [AgentScope, OpenClaw, 多 Agent 协作,任务规划] category: 系统设计

AgentScope 集成方案 - 复杂任务执行系统设计

📋 基于 AgentScope 官方文档:设计 OpenClaw 与 AgentScope 的集成方案,实现复杂任务的智能分解、多 Agent 协作和进度可视化。


🎯 一、AgentScope 核心能力

1.1 AgentScope 简介

AgentScope 是阿里巴巴开源的多 Agent 协作框架:

1.2 核心概念

python
from agentscope.agents import Agent from agentscope.message import Message # 1. Agent - 智能体基类 class MyAgent(Agent): def __init__(self, name, model_config): super().__init__(name, model_config) def reply(self, message: Message) -> Message: # 处理消息并返回响应 response = self.model.generate(message.content) return Message(name=self.name, content=response) # 2. Message - 消息格式 message = Message( name="user", content="请帮我分析这个项目", metadata={"task_type": "analysis"} ) # 3. Model - 模型配置 model_config = { "model_type": "dashscope_chat", "model_name": "qwen-max", "api_key": "your-api-key" }

1.3 内置 Agent 角色

AgentScope 提供多种内置 Agent:

Agent 类型用途适用场景
DialogAgent对话 Agent用户交互、问答
DictDialogAgent结构化对话返回 JSON 格式结果
ReActAgent推理 + 行动需要工具调用的任务
UserAgent用户代理模拟用户输入

🏗️ 二、集成架构设计

2.1 整体架构

┌─────────────────────────────────────────────────────────────────┐ │ 用户界面层 │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ React 前端 (任务看板) │ │ │ │ - 任务输入 - 进度展示 - 实时日志 - 结果查看 │ │ │ └─────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ WebSocket ▼ ┌─────────────────────────────────────────────────────────────────┐ │ API 服务层 │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ REST API │ │ WebSocket │ │ 任务队列 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ AgentScope 编排层 │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ TaskPlannerAgent (任务规划) │ │ │ │ - 任务分解 - 策略生成 - Agent 路由 │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────┼───────────────┐ │ │ ▼ ▼ ▼ │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ │ CodeGenerator │ │ ResearchAgent │ │ ExecutorAgent │ │ │ │ (代码生成) │ │ (调研分析) │ │ (任务执行) │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 执行引擎层 │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ OpenClaw │ │ Claude Code │ │ Qwen │ │ │ │ Executor │ │ Runner │ │ Code │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────────────┘

2.2 Agent 角色设计

python
from agentscope.agents import DictDialogAgent from agentscope.message import Message from typing import List, Dict # 1. 任务规划 Agent class TaskPlannerAgent(DictDialogAgent): """任务规划 Agent - 负责分解复杂任务""" def __init__(self, name: str, model_config: dict): super().__init__( name=name, model_config=model_config, sys_prompt="""你是一个专业的任务规划专家。 你的职责是: 1. 分析用户输入的复杂任务 2. 将任务分解为可执行的子任务 3. 为每个子任务分配合适的执行 Agent 4. 估算每个子任务的执行时间 请以 JSON 格式返回任务分解结果: { "task_name": "任务名称", "subtasks": [ { "id": "1", "name": "子任务名称", "description": "详细描述", "agent": "openclaw|claude-code|qwen-code", "estimated_duration": 300, "dependencies": [] } ] } """ ) def reply(self, message: Message) -> Message: response = super().reply(message) # 解析 JSON 响应 plan = self._parse_json(response.content) return Message( name=self.name, content=plan, metadata={"task_type": "planning"} ) # 2. 代码生成 Agent class CodeGeneratorAgent(DictDialogAgent): """代码生成 Agent - 负责代码编写""" def __init__(self, name: str, model_config: dict): super().__init__( name=name, model_config=model_config, sys_prompt="""你是一个资深软件工程师。 你的职责是: 1. 根据需求生成高质量的代码 2. 遵循最佳实践和编码规范 3. 添加必要的注释和文档 4. 考虑错误处理和边界情况 请以 JSON 格式返回: { "files": [ { "path": "文件路径", "content": "文件内容", "language": "python|javascript|etc" } ], "summary": "代码说明" } """ ) # 3. 执行器 Agent class ExecutorAgent(DictDialogAgent): """执行器 Agent - 负责调用 OpenClaw 执行任务""" def __init__(self, name: str, model_config: dict): super().__init__( name=name, model_config=model_config, sys_prompt="""你是一个任务执行专家。 你的职责是: 1. 调用 OpenClaw 工具执行具体任务 2. 监控执行进度 3. 处理执行异常 4. 返回执行结果 请以 JSON 格式返回: { "status": "running|completed|failed", "progress": 0-100, "result": "执行结果", "logs": ["日志列表"] } """ )

🔌 三、OpenClaw 集成实现

3.1 OpenClaw 工具封装

python
import subprocess import json from typing import Dict, Any class OpenClawTool: """OpenClaw 工具封装""" def __init__(self, agent_id: str = "main"): self.agent_id = agent_id def execute(self, task: str, timeout: int = 600) -> Dict[str, Any]: """ 执行 OpenClaw 任务 Args: task: 任务描述 timeout: 超时时间(秒) Returns: 执行结果 """ cmd = [ "openclaw", "agent", "--agent", self.agent_id, "--message", task, "--json", "--timeout", str(timeout) ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout + 30 ) if result.returncode != 0: return { "success": False, "error": result.stderr, "tool_calls": 0 } try: response = json.loads(result.stdout) return { "success": True, "result": response.get("result", ""), "tool_calls": response.get("tool_calls", 0), "tokens_used": response.get("usage", {}) } except json.JSONDecodeError: return { "success": True, "result": result.stdout, "tool_calls": 0 } def get_session_history(self, session_id: str) -> list: """获取会话历史""" cmd = [ "openclaw", "sessions", "history", "--session-id", session_id, "--json" ] result = subprocess.run( cmd, capture_output=True, text=True ) return json.loads(result.stdout) if result.returncode == 0 else []

3.2 工具注册到 AgentScope

python
from agentscope.tools import Tool # 定义 OpenClaw 工具 openclaw_execute_tool = Tool( name="openclaw_execute", description="执行 OpenClaw 任务", function=OpenClawTool().execute, parameters={ "type": "object", "properties": { "task": { "type": "string", "description": "任务描述" }, "timeout": { "type": "integer", "description": "超时时间(秒)", "default": 600 } }, "required": ["task"] } ) # 注册到 Agent class OpenClawAgent(DictDialogAgent): def __init__(self, name: str, model_config: dict): super().__init__(name, model_config) self.tools = [openclaw_execute_tool] def reply(self, message: Message) -> Message: # 使用 ReAct 模式:推理 -> 行动 -> 观察 return super().reply(message)

📊 四、任务进度跟踪

4.1 进度数据结构

python
from datetime import datetime from enum import Enum from typing import List, Optional from pydantic import BaseModel class TaskStatus(str, Enum): PENDING = "pending" PLANNING = "planning" EXECUTING = "executing" COMPLETED = "completed" FAILED = "failed" class SubTaskStatus(str, Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" class SubTaskProgress(BaseModel): id: str name: str status: SubTaskStatus progress: int # 0-100 agent: str started_at: Optional[datetime] = None completed_at: Optional[datetime] = None result: Optional[dict] = None error: Optional[str] = None class TaskProgress(BaseModel): task_id: str name: str status: TaskStatus overall_progress: int # 0-100 subtasks: List[SubTaskProgress] # 执行详情 current_agent: Optional[str] = None current_step: Optional[str] = None elapsed_time: int = 0 # 秒 estimated_remaining: int = 0 # 秒 tool_calls: int = 0 tokens_used: int = 0 # 日志 logs: List[dict] = [] # 时间戳 created_at: datetime = datetime.now() updated_at: datetime = datetime.now()

4.2 进度跟踪器实现

python
import asyncio from datetime import datetime from typing import List, Callable class ProgressTracker: """进度跟踪器""" def __init__(self, task_id: str): self.task_id = task_id self.progress = TaskProgress( task_id=task_id, name="", status=TaskStatus.PENDING, overall_progress=0, subtasks=[] ) self.ws_clients: List[WebSocket] = [] self.callbacks: List[Callable] = [] async def start(self): """开始跟踪""" self.progress.status = TaskStatus.PLANNING self.progress.started_at = datetime.now() await self._broadcast() async def update_subtask(self, subtask_id: str, **kwargs): """更新子任务进度""" for subtask in self.progress.subtasks: if subtask.id == subtask_id: for key, value in kwargs.items(): if hasattr(subtask, key): setattr(subtask, key, value) # 计算总体进度 self._calculate_overall_progress() self.progress.updated_at = datetime.now() await self._broadcast() break async def add_log(self, event_type: str, agent: str, content: str): """添加日志""" log_entry = { "timestamp": datetime.now().isoformat(), "event_type": event_type, "agent": agent, "content": content } self.progress.logs.append(log_entry) await self._broadcast() def _calculate_overall_progress(self): """计算总体进度""" if not self.progress.subtasks: return total = sum(st.progress for st in self.progress.subtasks) self.progress.overall_progress = total // len(self.progress.subtasks) async def _broadcast(self): """推送给所有客户端""" # 推送给 WebSocket 客户端 for client in self.ws_clients: try: await client.send_json(self.progress.dict()) except: pass # 调用回调函数 for callback in self.callbacks: try: callback(self.progress) except: pass def on_progress_update(self, callback: Callable): """注册进度更新回调""" self.callbacks.append(callback)

4.3 实时日志流

python
class LogStream: """日志流处理器""" def __init__(self, tracker: ProgressTracker): self.tracker = tracker self.buffer = [] async def on_tool_call(self, tool_name: str, args: dict): """监听 tool call""" await self.tracker.add_log( event_type="tool_call", agent="openclaw", content=f"调用工具:{tool_name}", metadata={"tool": tool_name, "args": args} ) async def on_agent_message(self, agent: str, message: str): """监听 Agent 消息""" await self.tracker.add_log( event_type="agent_message", agent=agent, content=message[:500] # 限制长度 ) async def on_status_change(self, old_status: str, new_status: str): """监听状态变化""" await self.tracker.add_log( event_type="status_change", agent="system", content=f"状态变化:{old_status}{new_status}" )

🎨 五、前端页面设计

5.1 任务看板页面

tsx
// TaskDashboard.tsx import React, { useState, useEffect } from 'react'; import { TaskProgress, SubTaskProgress } from './types'; export const TaskDashboard: React.FC<{ taskId: string }> = ({ taskId }) => { const [progress, setProgress] = useState<TaskProgress | null>(null); const [logs, setLogs] = useState([]); useEffect(() => { // 连接 WebSocket const ws = new WebSocket(`ws://localhost:8080/ws/tasks/${taskId}`); ws.onmessage = (event) => { const data = JSON.parse(event.data); setProgress(data); }; return () => ws.close(); }, [taskId]); if (!progress) return <div>加载中...</div>; return ( <div className="task-dashboard"> {/* 总体进度 */} <OverallProgress progress={progress.overall_progress} status={progress.status} /> {/* 子任务列表 */} <SubTaskList subtasks={progress.subtasks} /> {/* 实时日志 */} <LogStream logs={progress.logs} /> {/* 执行详情 */} <ExecutionDetails currentAgent={progress.current_agent} elapsedTime={progress.elapsed_time} toolCalls={progress.tool_calls} /> </div> ); };

5.2 进度条组件

tsx
// OverallProgress.tsx export const OverallProgress: React.FC<{ progress: number; status: string; }> = ({ progress, status }) => { const getStatusColor = () => { switch (status) { case 'completed': return 'green'; case 'failed': return 'red'; case 'executing': return 'blue'; default: return 'gray'; } }; return ( <div className="overall-progress"> <div className="progress-header"> <span>总体进度</span> <span>{progress}%</span> </div> <div className="progress-bar"> <div className="progress-fill" style={{ width: `${progress}%`, backgroundColor: getStatusColor() }} /> </div> <div className="status-badge">{status}</div> </div> ); };

5.3 子任务卡片

tsx
// SubTaskCard.tsx export const SubTaskCard: React.FC<{ subtask: SubTaskProgress }> = ({ subtask }) => { const getStatusIcon = () => { switch (subtask.status) { case 'completed': return '✅'; case 'running': return '🔄'; case 'failed': return '❌'; default: return '⏳'; } }; return ( <div className="subtask-card"> <div className="card-header"> <span>{getStatusIcon()}</span> <span>{subtask.name}</span> <span>{subtask.progress}%</span> </div> <div className="card-body"> <div>Agent: {subtask.agent}</div> {subtask.started_at && ( <div>开始时间:{subtask.started_at.toLocaleString()}</div> )} {subtask.error && ( <div className="error">{subtask.error}</div> )} </div> <div className="progress-bar"> <div className="fill" style={{ width: `${subtask.progress}%` }} /> </div> </div> ); };

🔄 六、长时间运行任务支持

6.1 Claude Code tmux 集成

python
import subprocess import pty import os from typing import List, Optional class ClaudeCodeSession: """Claude Code 会话管理""" def __init__(self, project_dir: str): self.project_dir = project_dir self.session_id = None self.tmux_session = None def start(self, task: str) -> str: """启动 Claude Code 会话""" import uuid self.session_id = f"claude-{uuid.uuid4()}" self.tmux_session = self.session_id # 创建 tmux 会话 subprocess.run([ 'tmux', 'new-session', '-d', '-s', self.tmux_session, f'cd {self.project_dir} && claude' ]) # 等待启动 import time time.sleep(2) # 发送任务 self.send_command(task) return self.session_id def send_command(self, command: str): """发送命令""" subprocess.run([ 'tmux', 'send-keys', '-t', self.tmux_session, command, 'Enter' ]) def get_output(self, lines: int = 100) -> List[str]: """获取输出""" result = subprocess.run([ 'tmux', 'capture-pane', '-t', self.tmux_session, '-p', '-S', f'-{lines}' ], capture_output=True, text=True) return result.stdout.split('\n') def parse_status(self, output: List[str]) -> dict: """解析状态""" status = { "is_running": False, "current_file": None, "progress_estimate": 0 } # 分析最后几行 for line in output[-10:]: if "Thinking" in line or "Writing" in line: status["is_running"] = True if "file" in line.lower() and "." in line: status["current_file"] = line.strip() return status def reconnect(self, session_id: str): """重新连接""" self.tmux_session = session_id # 验证会话存在 result = subprocess.run([ 'tmux', 'has-session', '-t', session_id ]) if result.returncode != 0: raise Exception(f"会话 {session_id} 不存在") def close(self): """关闭会话""" if self.tmux_session: subprocess.run([ 'tmux', 'kill-session', '-t', self.tmux_session ])

6.2 快捷键映射

python
CLAUDE_SHORTCUTS = { # 基本控制 "pause": {"keys": ["C-c"], "description": "暂停当前操作"}, "resume": {"keys": ["Enter"], "description": "继续执行"}, "quit": {"keys": ["C-d"], "description": "退出"}, # 模型切换 "switch_model": {"keys": ["/", "model"], "description": "切换模型"}, # 信息获取 "help": {"keys": ["/", "help"], "description": "帮助"}, "stats": {"keys": ["/", "stats"], "description": "统计"}, # 文件操作 "add_file": {"keys": ["/", "add"], "description": "添加文件"}, # 其他 "clear": {"keys": ["/", "clear"], "description": "清屏"}, } def send_shortcut(session_id: str, shortcut_name: str): """发送快捷键""" if shortcut_name not in CLAUDE_SHORTCUTS: raise ValueError(f"未知快捷键:{shortcut_name}") shortcut = CLAUDE_SHORTCUTS[shortcut_name] for key in shortcut["keys"]: subprocess.run([ 'tmux', 'send-keys', '-t', session_id, key ])

📋 七、实施计划

7.1 阶段一:基础框架(1-2 周)

  • 安装 AgentScope: pip install agentscope
  • 创建基础 Agent 类
  • 实现 OpenClaw 工具封装
  • 实现进度跟踪器

7.2 阶段二:任务规划(1 周)

  • 实现 TaskPlannerAgent
  • 设计任务分解 prompt
  • 测试任务分解效果

7.3 阶段三:执行集成(1-2 周)

  • 集成 OpenClaw
  • 集成 Claude Code (tmux)
  • 实现执行监控

7.4 阶段四:前端看板(2 周)

  • 创建 React 项目
  • 实现任务列表页面
  • 实现进度展示组件
  • 实现 WebSocket 实时更新

7.5 阶段五:优化完善(1 周)

  • 性能优化
  • 错误处理
  • 文档编写
  • 测试用例

📚 八、参考资料


本文档基于 AgentScope 官方文档和 OpenClaw 实际使用情况设计 创建时间:2026-03-23

本文作者:lazyyoun

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!