📋 基于 AgentScope 官方文档:设计 OpenClaw 与 AgentScope 的集成方案,实现复杂任务的智能分解、多 Agent 协作和进度可视化。
AgentScope 是阿里巴巴开源的多 Agent 协作框架:
pythonfrom agentscope.agents import Agent
from agentscope.message import Message
# 1. Agent - 智能体基类
class MyAgent(Agent):
def __init__(self, name, model_config):
super().__init__(name, model_config)
def reply(self, message: Message) -> Message:
# 处理消息并返回响应
response = self.model.generate(message.content)
return Message(name=self.name, content=response)
# 2. Message - 消息格式
message = Message(
name="user",
content="请帮我分析这个项目",
metadata={"task_type": "analysis"}
)
# 3. Model - 模型配置
model_config = {
"model_type": "dashscope_chat",
"model_name": "qwen-max",
"api_key": "your-api-key"
}
AgentScope 提供多种内置 Agent:
| Agent 类型 | 用途 | 适用场景 |
|---|---|---|
DialogAgent | 对话 Agent | 用户交互、问答 |
DictDialogAgent | 结构化对话 | 返回 JSON 格式结果 |
ReActAgent | 推理 + 行动 | 需要工具调用的任务 |
UserAgent | 用户代理 | 模拟用户输入 |
┌─────────────────────────────────────────────────────────────────┐ │ 用户界面层 │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ React 前端 (任务看板) │ │ │ │ - 任务输入 - 进度展示 - 实时日志 - 结果查看 │ │ │ └─────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ WebSocket ▼ ┌─────────────────────────────────────────────────────────────────┐ │ API 服务层 │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ REST API │ │ WebSocket │ │ 任务队列 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ AgentScope 编排层 │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ TaskPlannerAgent (任务规划) │ │ │ │ - 任务分解 - 策略生成 - Agent 路由 │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────┼───────────────┐ │ │ ▼ ▼ ▼ │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ │ CodeGenerator │ │ ResearchAgent │ │ ExecutorAgent │ │ │ │ (代码生成) │ │ (调研分析) │ │ (任务执行) │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 执行引擎层 │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ OpenClaw │ │ Claude Code │ │ Qwen │ │ │ │ Executor │ │ Runner │ │ Code │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────────────┘
pythonfrom agentscope.agents import DictDialogAgent
from agentscope.message import Message
from typing import List, Dict
# 1. 任务规划 Agent
class TaskPlannerAgent(DictDialogAgent):
"""任务规划 Agent - 负责分解复杂任务"""
def __init__(self, name: str, model_config: dict):
super().__init__(
name=name,
model_config=model_config,
sys_prompt="""你是一个专业的任务规划专家。
你的职责是:
1. 分析用户输入的复杂任务
2. 将任务分解为可执行的子任务
3. 为每个子任务分配合适的执行 Agent
4. 估算每个子任务的执行时间
请以 JSON 格式返回任务分解结果:
{
"task_name": "任务名称",
"subtasks": [
{
"id": "1",
"name": "子任务名称",
"description": "详细描述",
"agent": "openclaw|claude-code|qwen-code",
"estimated_duration": 300,
"dependencies": []
}
]
}
"""
)
def reply(self, message: Message) -> Message:
response = super().reply(message)
# 解析 JSON 响应
plan = self._parse_json(response.content)
return Message(
name=self.name,
content=plan,
metadata={"task_type": "planning"}
)
# 2. 代码生成 Agent
class CodeGeneratorAgent(DictDialogAgent):
"""代码生成 Agent - 负责代码编写"""
def __init__(self, name: str, model_config: dict):
super().__init__(
name=name,
model_config=model_config,
sys_prompt="""你是一个资深软件工程师。
你的职责是:
1. 根据需求生成高质量的代码
2. 遵循最佳实践和编码规范
3. 添加必要的注释和文档
4. 考虑错误处理和边界情况
请以 JSON 格式返回:
{
"files": [
{
"path": "文件路径",
"content": "文件内容",
"language": "python|javascript|etc"
}
],
"summary": "代码说明"
}
"""
)
# 3. 执行器 Agent
class ExecutorAgent(DictDialogAgent):
"""执行器 Agent - 负责调用 OpenClaw 执行任务"""
def __init__(self, name: str, model_config: dict):
super().__init__(
name=name,
model_config=model_config,
sys_prompt="""你是一个任务执行专家。
你的职责是:
1. 调用 OpenClaw 工具执行具体任务
2. 监控执行进度
3. 处理执行异常
4. 返回执行结果
请以 JSON 格式返回:
{
"status": "running|completed|failed",
"progress": 0-100,
"result": "执行结果",
"logs": ["日志列表"]
}
"""
)
pythonimport subprocess
import json
from typing import Dict, Any
class OpenClawTool:
"""OpenClaw 工具封装"""
def __init__(self, agent_id: str = "main"):
self.agent_id = agent_id
def execute(self, task: str, timeout: int = 600) -> Dict[str, Any]:
"""
执行 OpenClaw 任务
Args:
task: 任务描述
timeout: 超时时间(秒)
Returns:
执行结果
"""
cmd = [
"openclaw", "agent",
"--agent", self.agent_id,
"--message", task,
"--json",
"--timeout", str(timeout)
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout + 30
)
if result.returncode != 0:
return {
"success": False,
"error": result.stderr,
"tool_calls": 0
}
try:
response = json.loads(result.stdout)
return {
"success": True,
"result": response.get("result", ""),
"tool_calls": response.get("tool_calls", 0),
"tokens_used": response.get("usage", {})
}
except json.JSONDecodeError:
return {
"success": True,
"result": result.stdout,
"tool_calls": 0
}
def get_session_history(self, session_id: str) -> list:
"""获取会话历史"""
cmd = [
"openclaw", "sessions", "history",
"--session-id", session_id,
"--json"
]
result = subprocess.run(
cmd,
capture_output=True,
text=True
)
return json.loads(result.stdout) if result.returncode == 0 else []
pythonfrom agentscope.tools import Tool
# 定义 OpenClaw 工具
openclaw_execute_tool = Tool(
name="openclaw_execute",
description="执行 OpenClaw 任务",
function=OpenClawTool().execute,
parameters={
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "任务描述"
},
"timeout": {
"type": "integer",
"description": "超时时间(秒)",
"default": 600
}
},
"required": ["task"]
}
)
# 注册到 Agent
class OpenClawAgent(DictDialogAgent):
def __init__(self, name: str, model_config: dict):
super().__init__(name, model_config)
self.tools = [openclaw_execute_tool]
def reply(self, message: Message) -> Message:
# 使用 ReAct 模式:推理 -> 行动 -> 观察
return super().reply(message)
pythonfrom datetime import datetime
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel
class TaskStatus(str, Enum):
PENDING = "pending"
PLANNING = "planning"
EXECUTING = "executing"
COMPLETED = "completed"
FAILED = "failed"
class SubTaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class SubTaskProgress(BaseModel):
id: str
name: str
status: SubTaskStatus
progress: int # 0-100
agent: str
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
result: Optional[dict] = None
error: Optional[str] = None
class TaskProgress(BaseModel):
task_id: str
name: str
status: TaskStatus
overall_progress: int # 0-100
subtasks: List[SubTaskProgress]
# 执行详情
current_agent: Optional[str] = None
current_step: Optional[str] = None
elapsed_time: int = 0 # 秒
estimated_remaining: int = 0 # 秒
tool_calls: int = 0
tokens_used: int = 0
# 日志
logs: List[dict] = []
# 时间戳
created_at: datetime = datetime.now()
updated_at: datetime = datetime.now()
pythonimport asyncio
from datetime import datetime
from typing import List, Callable
class ProgressTracker:
"""进度跟踪器"""
def __init__(self, task_id: str):
self.task_id = task_id
self.progress = TaskProgress(
task_id=task_id,
name="",
status=TaskStatus.PENDING,
overall_progress=0,
subtasks=[]
)
self.ws_clients: List[WebSocket] = []
self.callbacks: List[Callable] = []
async def start(self):
"""开始跟踪"""
self.progress.status = TaskStatus.PLANNING
self.progress.started_at = datetime.now()
await self._broadcast()
async def update_subtask(self, subtask_id: str, **kwargs):
"""更新子任务进度"""
for subtask in self.progress.subtasks:
if subtask.id == subtask_id:
for key, value in kwargs.items():
if hasattr(subtask, key):
setattr(subtask, key, value)
# 计算总体进度
self._calculate_overall_progress()
self.progress.updated_at = datetime.now()
await self._broadcast()
break
async def add_log(self, event_type: str, agent: str, content: str):
"""添加日志"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"event_type": event_type,
"agent": agent,
"content": content
}
self.progress.logs.append(log_entry)
await self._broadcast()
def _calculate_overall_progress(self):
"""计算总体进度"""
if not self.progress.subtasks:
return
total = sum(st.progress for st in self.progress.subtasks)
self.progress.overall_progress = total // len(self.progress.subtasks)
async def _broadcast(self):
"""推送给所有客户端"""
# 推送给 WebSocket 客户端
for client in self.ws_clients:
try:
await client.send_json(self.progress.dict())
except:
pass
# 调用回调函数
for callback in self.callbacks:
try:
callback(self.progress)
except:
pass
def on_progress_update(self, callback: Callable):
"""注册进度更新回调"""
self.callbacks.append(callback)
pythonclass LogStream:
"""日志流处理器"""
def __init__(self, tracker: ProgressTracker):
self.tracker = tracker
self.buffer = []
async def on_tool_call(self, tool_name: str, args: dict):
"""监听 tool call"""
await self.tracker.add_log(
event_type="tool_call",
agent="openclaw",
content=f"调用工具:{tool_name}",
metadata={"tool": tool_name, "args": args}
)
async def on_agent_message(self, agent: str, message: str):
"""监听 Agent 消息"""
await self.tracker.add_log(
event_type="agent_message",
agent=agent,
content=message[:500] # 限制长度
)
async def on_status_change(self, old_status: str, new_status: str):
"""监听状态变化"""
await self.tracker.add_log(
event_type="status_change",
agent="system",
content=f"状态变化:{old_status} → {new_status}"
)
tsx// TaskDashboard.tsx
import React, { useState, useEffect } from 'react';
import { TaskProgress, SubTaskProgress } from './types';
export const TaskDashboard: React.FC<{ taskId: string }> = ({ taskId }) => {
const [progress, setProgress] = useState<TaskProgress | null>(null);
const [logs, setLogs] = useState([]);
useEffect(() => {
// 连接 WebSocket
const ws = new WebSocket(`ws://localhost:8080/ws/tasks/${taskId}`);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
setProgress(data);
};
return () => ws.close();
}, [taskId]);
if (!progress) return <div>加载中...</div>;
return (
<div className="task-dashboard">
{/* 总体进度 */}
<OverallProgress
progress={progress.overall_progress}
status={progress.status}
/>
{/* 子任务列表 */}
<SubTaskList subtasks={progress.subtasks} />
{/* 实时日志 */}
<LogStream logs={progress.logs} />
{/* 执行详情 */}
<ExecutionDetails
currentAgent={progress.current_agent}
elapsedTime={progress.elapsed_time}
toolCalls={progress.tool_calls}
/>
</div>
);
};
tsx// OverallProgress.tsx
export const OverallProgress: React.FC<{
progress: number;
status: string;
}> = ({ progress, status }) => {
const getStatusColor = () => {
switch (status) {
case 'completed': return 'green';
case 'failed': return 'red';
case 'executing': return 'blue';
default: return 'gray';
}
};
return (
<div className="overall-progress">
<div className="progress-header">
<span>总体进度</span>
<span>{progress}%</span>
</div>
<div className="progress-bar">
<div
className="progress-fill"
style={{
width: `${progress}%`,
backgroundColor: getStatusColor()
}}
/>
</div>
<div className="status-badge">{status}</div>
</div>
);
};
tsx// SubTaskCard.tsx
export const SubTaskCard: React.FC<{ subtask: SubTaskProgress }> = ({
subtask
}) => {
const getStatusIcon = () => {
switch (subtask.status) {
case 'completed': return '✅';
case 'running': return '🔄';
case 'failed': return '❌';
default: return '⏳';
}
};
return (
<div className="subtask-card">
<div className="card-header">
<span>{getStatusIcon()}</span>
<span>{subtask.name}</span>
<span>{subtask.progress}%</span>
</div>
<div className="card-body">
<div>Agent: {subtask.agent}</div>
{subtask.started_at && (
<div>开始时间:{subtask.started_at.toLocaleString()}</div>
)}
{subtask.error && (
<div className="error">{subtask.error}</div>
)}
</div>
<div className="progress-bar">
<div
className="fill"
style={{ width: `${subtask.progress}%` }}
/>
</div>
</div>
);
};
pythonimport subprocess
import pty
import os
from typing import List, Optional
class ClaudeCodeSession:
"""Claude Code 会话管理"""
def __init__(self, project_dir: str):
self.project_dir = project_dir
self.session_id = None
self.tmux_session = None
def start(self, task: str) -> str:
"""启动 Claude Code 会话"""
import uuid
self.session_id = f"claude-{uuid.uuid4()}"
self.tmux_session = self.session_id
# 创建 tmux 会话
subprocess.run([
'tmux', 'new-session', '-d', '-s', self.tmux_session,
f'cd {self.project_dir} && claude'
])
# 等待启动
import time
time.sleep(2)
# 发送任务
self.send_command(task)
return self.session_id
def send_command(self, command: str):
"""发送命令"""
subprocess.run([
'tmux', 'send-keys', '-t', self.tmux_session,
command, 'Enter'
])
def get_output(self, lines: int = 100) -> List[str]:
"""获取输出"""
result = subprocess.run([
'tmux', 'capture-pane', '-t', self.tmux_session,
'-p', '-S', f'-{lines}'
], capture_output=True, text=True)
return result.stdout.split('\n')
def parse_status(self, output: List[str]) -> dict:
"""解析状态"""
status = {
"is_running": False,
"current_file": None,
"progress_estimate": 0
}
# 分析最后几行
for line in output[-10:]:
if "Thinking" in line or "Writing" in line:
status["is_running"] = True
if "file" in line.lower() and "." in line:
status["current_file"] = line.strip()
return status
def reconnect(self, session_id: str):
"""重新连接"""
self.tmux_session = session_id
# 验证会话存在
result = subprocess.run([
'tmux', 'has-session', '-t', session_id
])
if result.returncode != 0:
raise Exception(f"会话 {session_id} 不存在")
def close(self):
"""关闭会话"""
if self.tmux_session:
subprocess.run([
'tmux', 'kill-session', '-t', self.tmux_session
])
pythonCLAUDE_SHORTCUTS = {
# 基本控制
"pause": {"keys": ["C-c"], "description": "暂停当前操作"},
"resume": {"keys": ["Enter"], "description": "继续执行"},
"quit": {"keys": ["C-d"], "description": "退出"},
# 模型切换
"switch_model": {"keys": ["/", "model"], "description": "切换模型"},
# 信息获取
"help": {"keys": ["/", "help"], "description": "帮助"},
"stats": {"keys": ["/", "stats"], "description": "统计"},
# 文件操作
"add_file": {"keys": ["/", "add"], "description": "添加文件"},
# 其他
"clear": {"keys": ["/", "clear"], "description": "清屏"},
}
def send_shortcut(session_id: str, shortcut_name: str):
"""发送快捷键"""
if shortcut_name not in CLAUDE_SHORTCUTS:
raise ValueError(f"未知快捷键:{shortcut_name}")
shortcut = CLAUDE_SHORTCUTS[shortcut_name]
for key in shortcut["keys"]:
subprocess.run([
'tmux', 'send-keys', '-t', session_id,
key
])
pip install agentscope本文档基于 AgentScope 官方文档和 OpenClaw 实际使用情况设计 创建时间:2026-03-23
本文作者:lazyyoun
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!