来源:LangGraph 官方文档 · CrewAI 文档 · AG2 文档 | 整理时间:2026-04-19
概述
2026 年 Agent 框架生态迎来了重大更新,各大框架纷纷推出新版本,增强了协作能力、性能优化和企业级特性。本指南将详细介绍这些更新,并提供实践指导。
框架版本更新概览
主要框架更新时间线
| 框架 | 最新版本 | 发布日期 | 主要更新 |
|---|---|---|---|
| LangGraph | v0.3.0 | 2026-03 | 多 Agent 协作优化 |
| CrewAI | v0.41.0 | 2026-02 | 企业级工作流支持 |
| AG2 | v2.5.0 | 2026-04 | 性能大幅提升 |
| OpenManus | v1.2.0 | 2026-01 | 中文支持完善 |
| Semantic Kernel | v1.20.0 | 2026-03 | 企业集成增强 |
LangGraph 2026 最新特性
1. 多 Agent 协作系统
# langgraph_multi_agent.py
from langgraph.graph import Graph, END
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated
class AgentState(TypedDict):
messages: list
current_agent: str
shared_context: dict
def create_agent_graph():
graph = Graph()
# 定义 Agent
researcher_agent = ChatOpenAI(model="gpt-4")
writer_agent = ChatOpenAI(model="gpt-4")
reviewer_agent = ChatOpenAI(model="gpt-4")
# 添加节点
graph.add_node("research", research_node)
graph.add_node("write", write_node)
graph.add_node("review", review_node)
graph.add_node("decide", decide_node)
# 添加工具节点
tools = [web_search_tool, file_read_tool]
tool_node = ToolNode(tools)
graph.add_node("tools", tool_node)
# 设置流程
graph.add_edge("research", "write")
graph.add_edge("write", "review")
graph.add_edge("review", "decide")
graph.add_conditional_edges(
"decide",
decide_condition,
{
"rewrite": "write",
"end": END
}
)
# 设置入口点
graph.set_entry_point("research")
# 配置检查点
memory = MemorySaver()
return graph.compile(checkpointer=memory, state_schema=AgentState)
def research_node(state: AgentState):
"""研究节点"""
agent = ChatOpenAI(model="gpt-4")
prompt = f"""
你是研究专家。基于以下信息进行研究:
研究主题:{state['research_topic']}
共享上下文:{state.get('shared_context', {})}
请提供详细的研究报告,包含:
1. 相关背景信息
2. 关键发现
3. 数据支持
4. 推荐阅读
使用工具搜索最新信息。
"""
response = agent.invoke([{"role": "user", "content": prompt}])
return {
"messages": [response],
"current_agent": "researcher",
"shared_context": {
**state.get("shared_context", {}),
"research_data": response.content
}
}
def write_node(state: AgentState):
"""写作节点"""
agent = ChatOpenAI(model="gpt-4")
prompt = f"""
你是内容创作专家。基于研究报告撰写文章:
研究数据:{state['shared_context']['research_data']}
写作要求:专业、清晰、有逻辑性
请生成一篇完整的文章,包含:
1. 引言
2. 正文(3-5个段落)
3. 结论
确保内容准确、引用可靠。
"""
response = agent.invoke([{"role": "user", "content": prompt}])
return {
"messages": state["messages"] + [response],
"current_agent": "writer",
"shared_context": {
**state["shared_context"],
"draft_content": response.content
}
}
def review_node(state: AgentState):
"""审查节点"""
agent = ChatOpenAI(model="gpt-4")
prompt = f"""
你是内容审查专家。请审查文章:
文章内容:{state['shared_context']['draft_content']}
请从以下方面进行审查:
1. 事实准确性
2. 逻辑一致性
3. 语言表达
4. 结构完整性
提供修改建议。
"""
response = agent.invoke([{"role": "user", "content": prompt}])
return {
"messages": state["messages"] + [response],
"current_agent": "reviewer",
"shared_context": {
**state["shared_context"],
"review_feedback": response.content
}
}
def decide_node(state: AgentState):
"""决策节点"""
feedback = state["shared_context"].get("review_feedback", "")
if "需要修改" in feedback or "改进" in feedback:
return {"next_step": "rewrite"}
else:
return {"next_step": "end"}
def decide_condition(state: AgentState):
"""决策条件"""
return state["next_step"]
2. 增强的状态管理
# enhanced_state_management.py
from langgraph.graph import Graph
from langgraph.checkpoint.postgres import PostgresSaver
from datetime import datetime
import json
class EnhancedState(TypedDict):
messages: list
workflow_state: str
context_data: dict
metadata: dict
timestamps: dict
class PostgresStateStore:
"""PostgreSQL 状态存储"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.checkpointer = PostgresSaver(connection_string)
def save_state(self, state: EnhancedState, thread_id: str):
"""保存状态到数据库"""
state["timestamps"]["updated_at"] = datetime.now().isoformat()
self.checkpointer.save_state(thread_id, state)
def load_state(self, thread_id: str) -> EnhancedState:
"""从数据库加载状态"""
return self.checkpointer.load_state(thread_id)
# 使用示例
def create_enhanced_workflow():
graph = Graph()
# 检查点配置
state_store = PostgresStateStore("postgresql://user:pass@localhost/mydb")
# 添加节点
graph.add_node("init", initialize_state)
graph.add_node("process", process_data)
graph.add_node("validate", validate_data)
graph.add_node("complete", complete_workflow)
# 设置流程
graph.add_edge("init", "process")
graph.add_edge("process", "validate")
graph.add_edge("validate", "complete")
# 编译图
workflow = graph.compile(
checkpointer=state_store.checkpointer,
state_schema=EnhancedState
)
return workflow
def initialize_state(state: EnhancedState):
"""初始化状态"""
return {
"messages": [],
"workflow_state": "initialized",
"context_data": {},
"metadata": {
"created_at": datetime.now().isoformat(),
"version": "1.0.0"
},
"timestamps": {}
}
3. 工具集成优化
# tool_integration.py
from langgraph.prebuilt import ToolNode
from langgraph.graph import Graph
from typing import Dict, Any
class ToolRegistry:
"""工具注册表"""
def __init__(self):
self.tools: Dict[str, Callable] = {}
def register_tool(self, name: str, tool_func: Callable):
"""注册工具"""
self.tools[name] = tool_func
def get_tool(self, name: str) -> Callable:
"""获取工具"""
return self.tools.get(name)
def list_tools(self) -> list:
"""列出所有工具"""
return list(self.tools.keys())
def create_tool_integration_graph():
"""创建工具集成图"""
graph = Graph()
tool_registry = ToolRegistry()
# 注册工具
tool_registry.register_tool("web_search", web_search_func)
tool_registry.register_tool("file_write", file_write_func)
tool_registry.register_tool("data_analysis", data_analysis_func)
# 创建工具节点
def tool_wrapper(state: Dict[str, Any]):
"""工具包装器"""
tool_name = state.get("tool_name")
tool_args = state.get("tool_args", {})
tool_func = tool_registry.get_tool(tool_name)
if tool_func:
result = tool_func(**tool_args)
return {"tool_result": result}
else:
return {"error": f"Tool {tool_name} not found"}
graph.add_node("execute_tool", tool_wrapper)
graph.add_edge("execute_tool", END)
return graph.compile()
CrewAI 2026 企业级特性
1. 企业工作流管理
# enterprise_crewai.py
from crewai import Agent, Task, Crew, Process
from crewai.tools import BaseTool
from typing import List, Dict, Any
import json
class EnterpriseWorkflow:
"""企业级工作流管理"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.agents: List[Agent] = []
self.tasks: List[Task] = []
self.crew: Crew = None
def setup_enterprise_agents(self):
"""设置企业级 Agent"""
# 研究专家
research_agent = Agent(
role="研究专家",
goal="进行深度研究和分析",
backstory="你有10年的研究经验,擅长数据分析和信息整合",
tools=[SearchTool(), AnalysisTool()],
verbose=True,
allow_delegation=True,
config=self.config.get("research_agent", {})
)
# 内容创作专家
content_agent = Agent(
role="内容创作专家",
goal="创建高质量的内容",
backstory="你是一名资深的内容创作者,擅长将复杂信息转化为易懂的内容",
tools=[ContentTool(), SEOTool()],
verbose=True,
allow_delegation=True,
config=self.config.get("content_agent", {})
)
# 质量保证专家
qa_agent = Agent(
role="质量保证专家",
goal="确保内容质量符合标准",
backstory="你负责内容质量检查和合规性验证",
tools=[QualityTool(), ComplianceTool()],
verbose=True,
allow_delegation=True,
config=self.config.get("qa_agent", {})
)
self.agents = [research_agent, content_agent, qa_agent]
def setup_enterprise_tasks(self):
"""设置企业级任务"""
research_task = Task(
description="研究行业趋势和竞争对手分析",
agent=self.agents[0],
expected_output="详细的研究报告",
config=self.config.get("research_task", {})
)
content_task = Task(
description="基于研究报告创建营销内容",
agent=self.agents[1],
expected_output="高质量的营销内容",
config=self.config.get("content_task", {})
)
qa_task = Task(
description="审查内容质量和合规性",
agent=self.agents[2],
expected_output="质量检查报告",
config=self.config.get("qa_task", {})
)
self.tasks = [research_task, content_task, qa_task]
def create_enterprise_crew(self):
"""创建企业级 Crew"""
self.crew = Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
verbose=True,
config=self.config.get("crew_config", {})
)
def run_workflow(self, inputs: Dict[str, Any]):
"""运行工作流"""
if not self.crew:
raise Exception("Crew not initialized")
result = self.crew.kickoff(inputs=inputs)
# 记录结果
self.log_workflow_result(result)
return result
def log_workflow_result(self, result):
"""记录工作流结果"""
log_data = {
"timestamp": datetime.now().isoformat(),
"result": str(result),
"config": self.config
}
with open("enterprise_workflow.log", "a", encoding="utf-8") as f:
f.write(json.dumps(log_data, ensure_ascii=False) + "\n")
2. 企业级工具集成
# enterprise_tools.py
from crewai.tools import BaseTool
from typing import Optional, Type
from pydantic import BaseModel, Field
import requests
class EnterpriseAPIWrapper(BaseTool):
"""企业 API 包装器"""
api_config: Dict[str, Any] = Field(default_factory=dict)
def _run(self, endpoint: str, method: str = "GET", **kwargs) -> str:
"""执行 API 调用"""
url = f"{self.api_config['base_url']}{endpoint}"
headers = {
"Authorization": f"Bearer {self.api_config['token']}",
"Content-Type": "application/json"
}
try:
if method.upper() == "GET":
response = requests.get(url, headers=headers, params=kwargs)
elif method.upper() == "POST":
response = requests.post(url, headers=headers, json=kwargs)
else:
raise ValueError(f"Unsupported method: {method}")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return f"API Error: {str(e)}"
class CRMTool(EnterpriseAPIWrapper):
"""CRM 工具"""
name: str = "crm_tool"
description: str = "访问 CRM 系统的客户数据"
def _run(self, customer_id: str) -> str:
"""获取客户信息"""
return super()._run(f"/customers/{customer_id}", "GET")
class ERPTool(EnterpriseAPIWrapper):
"""ERP 工具"""
name: str = "erp_tool"
description: str = "访问 ERP 系统的业务数据"
def _run(self, order_id: str) -> str:
"""获取订单信息"""
return super()._run(f"/orders/{order_id}", "GET")
class ComplianceTool(BaseTool):
"""合规检查工具"""
name: str = "compliance_check"
description: str = "检查内容是否符合企业合规要求"
def _run(self, content: str) -> str:
"""执行合规检查"""
# 这里可以集成实际的合规检查服务
checks = {
"pii_check": self._check_pii(content),
"sensitive_words": self._check_sensitive_words(content),
"format_check": self._check_format(content)
}
return json.dumps(checks, ensure_ascii=False)
def _check_pii(self, content: str) -> Dict[str, Any]:
"""检查个人信息"""
# 简单的 PII 检查实现
pii_patterns = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{10}\b', # Phone
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' # Email
]
found_pii = []
for pattern in pii_patterns:
matches = re.findall(pattern, content)
if matches:
found_pii.extend(matches)
return {"has_pii": len(found_pii) > 0, "count": len(found_pii)}
def _check_sensitive_words(self, content: str) -> Dict[str, Any]:
"""检查敏感词"""
sensitive_words = ["confidential", "secret", "internal"]
found_words = []
for word in sensitive_words:
if word.lower() in content.lower():
found_words.append(word)
return {"has_sensitive": len(found_words) > 0, "words": found_words}
def _check_format(self, content: str) -> Dict[str, Any]:
"""检查格式"""
# 检查基本格式要求
has_title = len(content.split('\n')[0].strip()) > 0
has_paragraphs = len(content.split('\n\n')) > 1
word_count = len(content.split())
return {
"has_title": has_title,
"has_paragraphs": has_paragraphs,
"word_count": word_count,
"meets_requirements": has_title and has_paragraphs and word_count > 100
}
AG2 2026 性能优化
1. 新的性能特性
# ag2_performance.py
from autogen import Agent, UserProxyAgent, GroupChat, GroupChatManager
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
class OptimizedAG2Group:
"""优化的 AG2 群组"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.agents = {}
self.group_chat = None
self.executor = ThreadPoolExecutor(max_workers=config.get("max_workers", 4))
def create_optimized_agents(self):
"""创建优化的 Agent"""
# 使用异步代理
research_agent = Agent(
name="research_agent",
system_message="You are a research assistant. Provide comprehensive analysis.",
llm_config={"config_list": self.config["llm_config"]}
)
analysis_agent = Agent(
name="analysis_agent",
system_message="You are an data analyst. Provide data-driven insights.",
llm_config={"config_list": self.config["llm_config"]}
)
writing_agent = Agent(
name="writing_agent",
system_message="You are a content writer. Create engaging content.",
llm_config={"config_list": self.config["llm_config"]}
)
self.agents = {
"research": research_agent,
"analysis": analysis_agent,
"writing": writing_agent
}
def create_group_chat(self):
"""创建群组聊天"""
self.group_chat = GroupChat(
agents=list(self.agents.values()),
messages=[],
max_round=self.config.get("max_round", 10),
speaker_selection_method=self.config.get("speaker_selection", "auto")
)
async def async_conversation(self, topic: str):
"""异步对话"""
# 创建用户代理
user_proxy = UserProxyAgent(
name="user_proxy",
human_input_mode="NEVER",
max_consecutive_auto_reply=10,
code_execution_config=False
)
# 启动对话
tasks = []
for agent in self.agents.values():
task = asyncio.create_task(
user_proxy.initiate_chat(
agent,
message=f"Please analyze {topic} and provide insights.",
clear_history=False
)
)
tasks.append(task)
# 等待所有任务完成
await asyncio.gather(*tasks)
return user_proxy.chat_messages
def batch_process(self, tasks: List[str]):
"""批量处理任务"""
results = []
def process_task(task):
# 创建用户代理
user_proxy = UserProxyAgent(
name="user_proxy",
human_input_mode="NEVER",
max_consecutive_auto_reply=10,
code_execution_config=False
)
# 执行任务
for agent in self.agents.values():
user_proxy.initiate_chat(
agent,
message=f"Process: {task}",
clear_history=True
)
return user_proxy.chat_messages
# 使用线程池并行处理
future_to_task = {
self.executor.submit(process_task, task): task
for task in tasks
}
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result = future.result()
results.append((task, result))
except Exception as e:
results.append((task, f"Error: {str(e)}"))
return results
def performance_monitor(self):
"""性能监控"""
start_time = time.time()
# 执行任务
result = self.batch_process(["task1", "task2", "task3"])
end_time = time.time()
duration = end_time - start_time
# 生成性能报告
performance_report = {
"total_tasks": len(result),
"duration_seconds": duration,
"avg_time_per_task": duration / len(result),
"throughput": len(result) / duration,
"agents_used": len(self.agents),
"max_workers": self.config.get("max_workers", 4)
}
return performance_report
2. 内存优化
# memory_optimization.py
from autogen import Agent, UserProxyAgent
from autogen.agentchat import ConversableAgent
from typing import Dict, List, Any
import pickle
import os
class MemoryOptimizedAgent(ConversableAgent):
"""内存优化的 Agent"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.message_cache = {}
self.cache_size = kwargs.get("cache_size", 100)
self.memory_file = kwargs.get("memory_file", "agent_memory.pkl")
def cache_message(self, message: Dict[str, Any]):
"""缓存消息"""
message_id = hash(str(message))
# LRU 缓存
if len(self.message_cache) >= self.cache_size:
oldest_key = next(iter(self.message_cache))
del self.message_cache[oldest_key]
self.message_cache[message_id] = message
def get_cached_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""获取缓存的消息"""
message_id = hash(str(message))
return self.message_cache.get(message_id)
def save_memory_to_disk(self):
"""保存记忆到磁盘"""
memory_data = {
"message_cache": self.message_cache,
"cache_size": self.cache_size
}
with open(self.memory_file, 'wb') as f:
pickle.dump(memory_data, f)
def load_memory_from_disk(self):
"""从磁盘加载记忆"""
if os.path.exists(self.memory_file):
with open(self.memory_file, 'rb') as f:
memory_data = pickle.load(f)
self.message_cache = memory_data["message_cache"]
self.cache_size = memory_data["cache_size"]
def compress_conversation(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""压缩对话历史"""
# 使用滑动窗口压缩
window_size = 5
compressed_messages = []
for i in range(0, len(messages), window_size):
window = messages[i:i + window_size]
# 合并相同类型的消息
if len(window) > 1:
combined = {
"role": "combined",
"content": f"[Combined {len(window)} messages]",
"timestamp": window[-1]["timestamp"],
"original_count": len(window)
}
compressed_messages.append(combined)
else:
compressed_messages.append(window[0])
return compressed_messages
OpenManus 中文支持增强
1. 中文 Agent 配置
# chinese_openmanus.py
from openmanus import Agent, Team
from typing import Dict, List, Any
import json
class ChineseAgent(Agent):
"""中文 Agent 基类"""
def __init__(self, name: str, role: str, goal: str, **kwargs):
super().__init__(name, role, goal, **kwargs)
self.language = "zh-CN"
self.cultural_context = kwargs.get("cultural_context", {})
def generate_prompt(self, task: str) -> str:
"""生成中文提示"""
prompt_template = f"""
你是{self.role},你的目标是{self.goal}。
任务:{task}
请使用中文回答,遵循以下要求:
1. 使用专业且易懂的语言
2. 考虑中文表达习惯
3. 适当使用成语和俗语
4. 保持逻辑清晰
"""
return prompt_template
def process_response(self, response: str) -> str:
"""处理 Agent 响应"""
# 中文优化处理
processed = response
# 修正常见的中文表达问题
processed = self._correct_chinese_expression(processed)
# 添加适当的标点符号
processed = self._add_punctuation(processed)
return processed
def _correct_chinese_expression(self, text: str) -> str:
"""修正中文表达"""
# 这里可以添加中文表达修正逻辑
corrections = {
"的": "得", # 语法修正
"在": "再", # 词性修正
}
for wrong, correct in corrections.items():
text = text.replace(wrong, correct)
return text
def _add_punctuation(self, text: str) -> str:
"""添加标点符号"""
# 简单的中文标点添加逻辑
if not text.endswith(。!?,、;:"''""()【】《》):
text += "。"
return text
class ChineseResearchAgent(ChineseAgent):
"""中文研究 Agent"""
def __init__(self, **kwargs):
super().__init__(
name="research_agent",
role="研究专家",
goal="进行深度研究和分析",
**kwargs
)
def research_topic(self, topic: str) -> Dict[str, Any]:
"""研究主题"""
prompt = self.generate_prompt(f"请研究以下主题:{topic}")
# 这里可以调用实际的 LLM
response = self.llm.invoke(prompt)
processed_response = self.process_response(response.content)
return {
"topic": topic,
"research_data": processed_response,
"language": "zh-CN",
"timestamp": datetime.now().isoformat()
}
class ChineseContentAgent(ChineseAgent):
"""中文内容创作 Agent"""
def __init__(self, **kwargs):
super().__init__(
name="content_agent",
role="内容创作者",
goal="创建高质量的中文内容",
**kwargs
)
def create_content(self, topic: str, style: str = "正式") -> str:
"""创建中文内容"""
prompt = self.generate_prompt(
f"请创建关于{topic}的{style}风格内容"
)
response = self.llm.invoke(prompt)
return self.process_response(response.content)
class ChineseTeam(Team):
"""中文团队"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.agents = []
def setup_chinese_agents(self):
"""设置中文 Agent"""
research_agent = ChineseResearchAgent(
llm_config=self.config.get("llm_config", {})
)
content_agent = ChineseContentAgent(
llm_config=self.config.get("llm_config", {})
)
self.agents = [research_agent, content_agent]
async async_process_topic(self, topic: str):
"""异步处理主题"""
# 研究阶段
research_result = await self.agents[0].research_topic(topic)
# 内容创作阶段
content = await self.agents[1].create_content(
topic=research_result["topic"],
style="专业"
)
return {
"research": research_result,
"content": content,
"language": "zh-CN"
}
实际应用案例
1. 企业内容创作系统
# enterprise_content_system.py
from langgraph.graph import Graph
from crewai import Agent, Task, Crew
from typing import Dict, Any
class EnterpriseContentSystem:
"""企业内容创作系统"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.langgraph_workflow = None
self.crewai_crew = None
def setup_workflow(self):
"""设置工作流"""
# LangGraph 工作流
langgraph = self.create_langgraph_workflow()
# CrewAI 团队
crew = self.create_crewai_team()
self.langgraph_workflow = langgraph
self.crewai_crew = crew
def create_langgraph_workflow(self):
"""创建 LangGraph 工作流"""
graph = Graph()
# 定义节点
graph.add_node("research", self.research_node)
graph.add_node("outline", self.outline_node)
graph.add_node("draft", self.draft_node)
graph.add_node("review", self.review_node)
graph.add_node("publish", self.publish_node)
# 设置流程
graph.add_edge("research", "outline")
graph.add_edge("outline", "draft")
graph.add_edge("draft", "review")
graph.add_edge("review", "publish")
return graph.compile()
def create_crewai_team(self):
"""创建 CrewAI 团队"""
# 定义 Agent
researcher = Agent(
role="研究专家",
goal="深入研究主题并提供数据支持",
backstory="你有10年的行业研究经验",
tools=[SearchTool(), DataAnalysisTool()],
verbose=True
)
writer = Agent(
role="内容创作者",
goal="创建高质量、引人入胜的内容",
backstory="你是一名资深的内容创作者",
tools=[ContentTool(), SEOTool()],
verbose=True
)
editor = Agent(
role="编辑",
goal="确保内容质量和符合标准",
backstory="你是一名专业的内容编辑",
tools=[QualityTool(), ComplianceTool()],
verbose=True
)
# 定义任务
research_task = Task(
description="研究主题并收集相关数据",
agent=researcher,
expected_output="研究报告"
)
writing_task = Task(
description="基于研究报告创建内容",
agent=writer,
expected_output="初稿"
)
editing_task = Task(
description="编辑和优化内容",
agent=editor,
expected_output="最终版本"
)
return Crew(
agents=[researcher, writer, editor],
tasks=[research_task, writing_task, editing_task],
process=Process.sequential,
verbose=True
)
def generate_content(self, request: Dict[str, Any]):
"""生成内容"""
# 使用 LangGraph 工作流
langgraph_result = self.langgraph_workflow.invoke({
"topic": request["topic"],
"audience": request["audience"],
"format": request["format"]
})
# 使用 CrewAI 进行精细处理
crew_result = self.crewai_crew.kickoff({
"topic": request["topic"],
"research_data": langgraph_result["research_data"]
})
return {
"langgraph_result": langgraph_result,
"crewai_result": crew_result,
"combined": self.combine_results(langgraph_result, crew_result)
}
def combine_results(self, langgraph: Dict, crewai: Dict) -> Dict:
"""合并结果"""
return {
"content": crewai["content"],
"metadata": {
"research_source": langgraph["research_data"],
"created_by": "EnterpriseContentSystem",
"timestamp": datetime.now().isoformat()
}
}
2. 智能客服系统
# intelligent_customer_service.py
from ag2 import Agent, UserProxyAgent
from langgraph.graph import Graph
from typing import Dict, List, Any
class IntelligentCustomerService:
"""智能客服系统"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.agents = {}
self.workflow = None
self.knowledge_base = {}
def setup_agents(self):
"""设置 Agent"""
# 初级客服
junior_agent = Agent(
name="junior_agent",
system_message="你是初级客服,处理简单问题",
llm_config=self.config["llm_config"]
)
# 高级客服
senior_agent = Agent(
name="senior_agent",
system_message="你是高级客服,处理复杂问题",
llm_config=self.config["llm_config"]
)
# 技术支持
tech_agent = Agent(
name="tech_agent",
system_message="你是技术支持,处理技术问题",
llm_config=self.config["llm_config"]
)
self.agents = {
"junior": junior_agent,
"senior": senior_agent,
"tech": tech_agent
}
def create_workflow(self):
"""创建工作流"""
graph = Graph()
# 添加节点
graph.add_node("classify", self.classify_intent)
graph.add_node("handle_junior", self.handle_junior_query)
graph.add_node("handle_senior", self.handle_senior_query)
graph.add_node("handle_tech", self.handle_tech_query)
graph.add_node("escalate", self.escalate_query)
graph.add_node("close", self.close_ticket)
# 设置流程
graph.add_edge("classify", "handle_junior")
graph.add_conditional_edges(
"handle_junior",
self.junior_decision,
{
"escalate": "handle_senior",
"tech": "handle_tech",
"close": "close"
}
)
return graph.compile()
def classify_intent(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""分类用户意图"""
message = state["message"]
# 意图分类逻辑
if any(keyword in message for keyword in ["登录", "账号", "密码"]):
return {"intent": "account", "agent": "junior"}
elif any(keyword in message for keyword in ["退款", "订单", "支付"]):
return {"intent": "order", "agent": "junior"}
elif any(keyword in message for keyword in ["错误", "故障", "无法"]):
return {"intent": "technical", "agent": "tech"}
else:
return {"intent": "general", "agent": "junior"}
def handle_junior_query(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""处理初级客服查询"""
user_proxy = UserProxyAgent(name="user_proxy", human_input_mode="NEVER")
agent = self.agents["junior"]
response = user_proxy.initiate_chat(
agent,
message=state["message"],
clear_history=True
)
return {"response": response, "resolved": self.is_resolved(response)}
def escalate_query(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""升级查询"""
user_proxy = UserProxyAgent(name="user_proxy", human_input_mode="NEVER")
agent = self.agents["senior"]
response = user_proxy.initiate_chat(
agent,
message=f"升级查询:{state['message']}",
clear_history=True
)
return {"response": response, "resolved": self.is_resolved(response)}
def is_resolved(self, response: str) -> bool:
"""判断是否已解决"""
positive_indicators = ["已解决", "完成", "处理完毕", "满意"]
return any(indicator in response for indicator in positive_indicators)
总结
2026 年 Agent 框架的更新带来了以下重要进展:
- LangGraph:多 Agent 协作和状态管理增强
- CrewAI:企业级工作流和工具集成
- AG2:性能优化和内存管理
- OpenManus:中文支持完善
选择建议
| 场景 | 推荐框架 | 理由 |
|---|---|---|
| 复杂工作流 | LangGraph | 强大的状态管理和协作能力 |
| 企业应用 | CrewAI | 企业级特性和工具集成 |
| 高性能需求 | AG2 | 优化的性能和内存管理 |
| 中文项目 | OpenManus | 优秀的中文支持 |
| 快速开发 | LangChain | 丰富的预构建组件 |
最佳实践
- 模块化设计:使用 Agent 的组合而非单体
- 状态管理:合理使用检查点和状态存储
- 工具集成:将现有服务集成为工具
- 错误处理:实现完善的错误处理和恢复机制
- 性能监控:持续监控和优化性能
记住,选择框架时要考虑具体需求、团队技能和长期维护成本。最佳实践是从小项目开始,逐步扩展功能。