返回资料库 实战:框架与工具

Agent 框架 2026 最新更新与实践指南

2026 年 Agent 框架生态迎来了重大更新,各大框架纷纷推出新版本,增强了协作能力、性能优化和企业级特性。本指南将详细介绍这些更新,并提供实践指导。

来源:LangGraph 官方文档 · CrewAI 文档 · AG2 文档 | 整理时间:2026-04-19


概述

2026 年 Agent 框架生态迎来了重大更新,各大框架纷纷推出新版本,增强了协作能力、性能优化和企业级特性。本指南将详细介绍这些更新,并提供实践指导。

框架版本更新概览

主要框架更新时间线

框架 最新版本 发布日期 主要更新
LangGraph v0.3.0 2026-03 多 Agent 协作优化
CrewAI v0.41.0 2026-02 企业级工作流支持
AG2 v2.5.0 2026-04 性能大幅提升
OpenManus v1.2.0 2026-01 中文支持完善
Semantic Kernel v1.20.0 2026-03 企业集成增强

LangGraph 2026 最新特性

1. 多 Agent 协作系统

# langgraph_multi_agent.py
from langgraph.graph import Graph, END
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated

class AgentState(TypedDict):
    messages: list
    current_agent: str
    shared_context: dict

def create_agent_graph():
    graph = Graph()
    
    # 定义 Agent
    researcher_agent = ChatOpenAI(model="gpt-4")
    writer_agent = ChatOpenAI(model="gpt-4")
    reviewer_agent = ChatOpenAI(model="gpt-4")
    
    # 添加节点
    graph.add_node("research", research_node)
    graph.add_node("write", write_node)
    graph.add_node("review", review_node)
    graph.add_node("decide", decide_node)
    
    # 添加工具节点
    tools = [web_search_tool, file_read_tool]
    tool_node = ToolNode(tools)
    graph.add_node("tools", tool_node)
    
    # 设置流程
    graph.add_edge("research", "write")
    graph.add_edge("write", "review")
    graph.add_edge("review", "decide")
    graph.add_conditional_edges(
        "decide",
        decide_condition,
        {
            "rewrite": "write",
            "end": END
        }
    )
    
    # 设置入口点
    graph.set_entry_point("research")
    
    # 配置检查点
    memory = MemorySaver()
    return graph.compile(checkpointer=memory, state_schema=AgentState)

def research_node(state: AgentState):
    """研究节点"""
    agent = ChatOpenAI(model="gpt-4")
    prompt = f"""
    你是研究专家。基于以下信息进行研究:
    
    研究主题:{state['research_topic']}
    共享上下文:{state.get('shared_context', {})}
    
    请提供详细的研究报告,包含:
    1. 相关背景信息
    2. 关键发现
    3. 数据支持
    4. 推荐阅读
    
    使用工具搜索最新信息。
    """
    
    response = agent.invoke([{"role": "user", "content": prompt}])
    return {
        "messages": [response],
        "current_agent": "researcher",
        "shared_context": {
            **state.get("shared_context", {}),
            "research_data": response.content
        }
    }

def write_node(state: AgentState):
    """写作节点"""
    agent = ChatOpenAI(model="gpt-4")
    prompt = f"""
    你是内容创作专家。基于研究报告撰写文章:
    
    研究数据:{state['shared_context']['research_data']}
    写作要求:专业、清晰、有逻辑性
    
    请生成一篇完整的文章,包含:
    1. 引言
    2. 正文(3-5个段落)
    3. 结论
    
    确保内容准确、引用可靠。
    """
    
    response = agent.invoke([{"role": "user", "content": prompt}])
    return {
        "messages": state["messages"] + [response],
        "current_agent": "writer",
        "shared_context": {
            **state["shared_context"],
            "draft_content": response.content
        }
    }

def review_node(state: AgentState):
    """审查节点"""
    agent = ChatOpenAI(model="gpt-4")
    prompt = f"""
    你是内容审查专家。请审查文章:
    
    文章内容:{state['shared_context']['draft_content']}
    
    请从以下方面进行审查:
    1. 事实准确性
    2. 逻辑一致性
    3. 语言表达
    4. 结构完整性
    
    提供修改建议。
    """
    
    response = agent.invoke([{"role": "user", "content": prompt}])
    return {
        "messages": state["messages"] + [response],
        "current_agent": "reviewer",
        "shared_context": {
            **state["shared_context"],
            "review_feedback": response.content
        }
    }

def decide_node(state: AgentState):
    """决策节点"""
    feedback = state["shared_context"].get("review_feedback", "")
    
    if "需要修改" in feedback or "改进" in feedback:
        return {"next_step": "rewrite"}
    else:
        return {"next_step": "end"}

def decide_condition(state: AgentState):
    """决策条件"""
    return state["next_step"]

2. 增强的状态管理

# enhanced_state_management.py
from langgraph.graph import Graph
from langgraph.checkpoint.postgres import PostgresSaver
from datetime import datetime
import json

class EnhancedState(TypedDict):
    messages: list
    workflow_state: str
    context_data: dict
    metadata: dict
    timestamps: dict

class PostgresStateStore:
    """PostgreSQL 状态存储"""
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.checkpointer = PostgresSaver(connection_string)
    
    def save_state(self, state: EnhancedState, thread_id: str):
        """保存状态到数据库"""
        state["timestamps"]["updated_at"] = datetime.now().isoformat()
        self.checkpointer.save_state(thread_id, state)
    
    def load_state(self, thread_id: str) -> EnhancedState:
        """从数据库加载状态"""
        return self.checkpointer.load_state(thread_id)

# 使用示例
def create_enhanced_workflow():
    graph = Graph()
    
    # 检查点配置
    state_store = PostgresStateStore("postgresql://user:pass@localhost/mydb")
    
    # 添加节点
    graph.add_node("init", initialize_state)
    graph.add_node("process", process_data)
    graph.add_node("validate", validate_data)
    graph.add_node("complete", complete_workflow)
    
    # 设置流程
    graph.add_edge("init", "process")
    graph.add_edge("process", "validate")
    graph.add_edge("validate", "complete")
    
    # 编译图
    workflow = graph.compile(
        checkpointer=state_store.checkpointer,
        state_schema=EnhancedState
    )
    
    return workflow

def initialize_state(state: EnhancedState):
    """初始化状态"""
    return {
        "messages": [],
        "workflow_state": "initialized",
        "context_data": {},
        "metadata": {
            "created_at": datetime.now().isoformat(),
            "version": "1.0.0"
        },
        "timestamps": {}
    }

3. 工具集成优化

# tool_integration.py
from langgraph.prebuilt import ToolNode
from langgraph.graph import Graph
from typing import Dict, Any

class ToolRegistry:
    """工具注册表"""
    def __init__(self):
        self.tools: Dict[str, Callable] = {}
    
    def register_tool(self, name: str, tool_func: Callable):
        """注册工具"""
        self.tools[name] = tool_func
    
    def get_tool(self, name: str) -> Callable:
        """获取工具"""
        return self.tools.get(name)
    
    def list_tools(self) -> list:
        """列出所有工具"""
        return list(self.tools.keys())

def create_tool_integration_graph():
    """创建工具集成图"""
    graph = Graph()
    tool_registry = ToolRegistry()
    
    # 注册工具
    tool_registry.register_tool("web_search", web_search_func)
    tool_registry.register_tool("file_write", file_write_func)
    tool_registry.register_tool("data_analysis", data_analysis_func)
    
    # 创建工具节点
    def tool_wrapper(state: Dict[str, Any]):
        """工具包装器"""
        tool_name = state.get("tool_name")
        tool_args = state.get("tool_args", {})
        
        tool_func = tool_registry.get_tool(tool_name)
        if tool_func:
            result = tool_func(**tool_args)
            return {"tool_result": result}
        else:
            return {"error": f"Tool {tool_name} not found"}
    
    graph.add_node("execute_tool", tool_wrapper)
    graph.add_edge("execute_tool", END)
    
    return graph.compile()

CrewAI 2026 企业级特性

1. 企业工作流管理

# enterprise_crewai.py
from crewai import Agent, Task, Crew, Process
from crewai.tools import BaseTool
from typing import List, Dict, Any
import json

class EnterpriseWorkflow:
    """企业级工作流管理"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.agents: List[Agent] = []
        self.tasks: List[Task] = []
        self.crew: Crew = None
    
    def setup_enterprise_agents(self):
        """设置企业级 Agent"""
        # 研究专家
        research_agent = Agent(
            role="研究专家",
            goal="进行深度研究和分析",
            backstory="你有10年的研究经验,擅长数据分析和信息整合",
            tools=[SearchTool(), AnalysisTool()],
            verbose=True,
            allow_delegation=True,
            config=self.config.get("research_agent", {})
        )
        
        # 内容创作专家
        content_agent = Agent(
            role="内容创作专家",
            goal="创建高质量的内容",
            backstory="你是一名资深的内容创作者,擅长将复杂信息转化为易懂的内容",
            tools=[ContentTool(), SEOTool()],
            verbose=True,
            allow_delegation=True,
            config=self.config.get("content_agent", {})
        )
        
        # 质量保证专家
        qa_agent = Agent(
            role="质量保证专家",
            goal="确保内容质量符合标准",
            backstory="你负责内容质量检查和合规性验证",
            tools=[QualityTool(), ComplianceTool()],
            verbose=True,
            allow_delegation=True,
            config=self.config.get("qa_agent", {})
        )
        
        self.agents = [research_agent, content_agent, qa_agent]
    
    def setup_enterprise_tasks(self):
        """设置企业级任务"""
        research_task = Task(
            description="研究行业趋势和竞争对手分析",
            agent=self.agents[0],
            expected_output="详细的研究报告",
            config=self.config.get("research_task", {})
        )
        
        content_task = Task(
            description="基于研究报告创建营销内容",
            agent=self.agents[1],
            expected_output="高质量的营销内容",
            config=self.config.get("content_task", {})
        )
        
        qa_task = Task(
            description="审查内容质量和合规性",
            agent=self.agents[2],
            expected_output="质量检查报告",
            config=self.config.get("qa_task", {})
        )
        
        self.tasks = [research_task, content_task, qa_task]
    
    def create_enterprise_crew(self):
        """创建企业级 Crew"""
        self.crew = Crew(
            agents=self.agents,
            tasks=self.tasks,
            process=Process.sequential,
            verbose=True,
            config=self.config.get("crew_config", {})
        )
    
    def run_workflow(self, inputs: Dict[str, Any]):
        """运行工作流"""
        if not self.crew:
            raise Exception("Crew not initialized")
        
        result = self.crew.kickoff(inputs=inputs)
        
        # 记录结果
        self.log_workflow_result(result)
        
        return result
    
    def log_workflow_result(self, result):
        """记录工作流结果"""
        log_data = {
            "timestamp": datetime.now().isoformat(),
            "result": str(result),
            "config": self.config
        }
        
        with open("enterprise_workflow.log", "a", encoding="utf-8") as f:
            f.write(json.dumps(log_data, ensure_ascii=False) + "\n")

2. 企业级工具集成

# enterprise_tools.py
from crewai.tools import BaseTool
from typing import Optional, Type
from pydantic import BaseModel, Field
import requests

class EnterpriseAPIWrapper(BaseTool):
    """企业 API 包装器"""
    
    api_config: Dict[str, Any] = Field(default_factory=dict)
    
    def _run(self, endpoint: str, method: str = "GET", **kwargs) -> str:
        """执行 API 调用"""
        url = f"{self.api_config['base_url']}{endpoint}"
        
        headers = {
            "Authorization": f"Bearer {self.api_config['token']}",
            "Content-Type": "application/json"
        }
        
        try:
            if method.upper() == "GET":
                response = requests.get(url, headers=headers, params=kwargs)
            elif method.upper() == "POST":
                response = requests.post(url, headers=headers, json=kwargs)
            else:
                raise ValueError(f"Unsupported method: {method}")
            
            response.raise_for_status()
            return response.json()
            
        except requests.exceptions.RequestException as e:
            return f"API Error: {str(e)}"

class CRMTool(EnterpriseAPIWrapper):
    """CRM 工具"""
    
    name: str = "crm_tool"
    description: str = "访问 CRM 系统的客户数据"
    
    def _run(self, customer_id: str) -> str:
        """获取客户信息"""
        return super()._run(f"/customers/{customer_id}", "GET")

class ERPTool(EnterpriseAPIWrapper):
    """ERP 工具"""
    
    name: str = "erp_tool"
    description: str = "访问 ERP 系统的业务数据"
    
    def _run(self, order_id: str) -> str:
        """获取订单信息"""
        return super()._run(f"/orders/{order_id}", "GET")

class ComplianceTool(BaseTool):
    """合规检查工具"""
    
    name: str = "compliance_check"
    description: str = "检查内容是否符合企业合规要求"
    
    def _run(self, content: str) -> str:
        """执行合规检查"""
        # 这里可以集成实际的合规检查服务
        checks = {
            "pii_check": self._check_pii(content),
            "sensitive_words": self._check_sensitive_words(content),
            "format_check": self._check_format(content)
        }
        
        return json.dumps(checks, ensure_ascii=False)
    
    def _check_pii(self, content: str) -> Dict[str, Any]:
        """检查个人信息"""
        # 简单的 PII 检查实现
        pii_patterns = [
            r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
            r'\b\d{10}\b',             # Phone
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'  # Email
        ]
        
        found_pii = []
        for pattern in pii_patterns:
            matches = re.findall(pattern, content)
            if matches:
                found_pii.extend(matches)
        
        return {"has_pii": len(found_pii) > 0, "count": len(found_pii)}
    
    def _check_sensitive_words(self, content: str) -> Dict[str, Any]:
        """检查敏感词"""
        sensitive_words = ["confidential", "secret", "internal"]
        found_words = []
        
        for word in sensitive_words:
            if word.lower() in content.lower():
                found_words.append(word)
        
        return {"has_sensitive": len(found_words) > 0, "words": found_words}
    
    def _check_format(self, content: str) -> Dict[str, Any]:
        """检查格式"""
        # 检查基本格式要求
        has_title = len(content.split('\n')[0].strip()) > 0
        has_paragraphs = len(content.split('\n\n')) > 1
        word_count = len(content.split())
        
        return {
            "has_title": has_title,
            "has_paragraphs": has_paragraphs,
            "word_count": word_count,
            "meets_requirements": has_title and has_paragraphs and word_count > 100
        }

AG2 2026 性能优化

1. 新的性能特性

# ag2_performance.py
from autogen import Agent, UserProxyAgent, GroupChat, GroupChatManager
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

class OptimizedAG2Group:
    """优化的 AG2 群组"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.agents = {}
        self.group_chat = None
        self.executor = ThreadPoolExecutor(max_workers=config.get("max_workers", 4))
    
    def create_optimized_agents(self):
        """创建优化的 Agent"""
        # 使用异步代理
        research_agent = Agent(
            name="research_agent",
            system_message="You are a research assistant. Provide comprehensive analysis.",
            llm_config={"config_list": self.config["llm_config"]}
        )
        
        analysis_agent = Agent(
            name="analysis_agent",
            system_message="You are an data analyst. Provide data-driven insights.",
            llm_config={"config_list": self.config["llm_config"]}
        )
        
        writing_agent = Agent(
            name="writing_agent",
            system_message="You are a content writer. Create engaging content.",
            llm_config={"config_list": self.config["llm_config"]}
        )
        
        self.agents = {
            "research": research_agent,
            "analysis": analysis_agent,
            "writing": writing_agent
        }
    
    def create_group_chat(self):
        """创建群组聊天"""
        self.group_chat = GroupChat(
            agents=list(self.agents.values()),
            messages=[],
            max_round=self.config.get("max_round", 10),
            speaker_selection_method=self.config.get("speaker_selection", "auto")
        )
    
    async def async_conversation(self, topic: str):
        """异步对话"""
        # 创建用户代理
        user_proxy = UserProxyAgent(
            name="user_proxy",
            human_input_mode="NEVER",
            max_consecutive_auto_reply=10,
            code_execution_config=False
        )
        
        # 启动对话
        tasks = []
        for agent in self.agents.values():
            task = asyncio.create_task(
                user_proxy.initiate_chat(
                    agent,
                    message=f"Please analyze {topic} and provide insights.",
                    clear_history=False
                )
            )
            tasks.append(task)
        
        # 等待所有任务完成
        await asyncio.gather(*tasks)
        
        return user_proxy.chat_messages
    
    def batch_process(self, tasks: List[str]):
        """批量处理任务"""
        results = []
        
        def process_task(task):
            # 创建用户代理
            user_proxy = UserProxyAgent(
                name="user_proxy",
                human_input_mode="NEVER",
                max_consecutive_auto_reply=10,
                code_execution_config=False
            )
            
            # 执行任务
            for agent in self.agents.values():
                user_proxy.initiate_chat(
                    agent,
                    message=f"Process: {task}",
                    clear_history=True
                )
            
            return user_proxy.chat_messages
        
        # 使用线程池并行处理
        future_to_task = {
            self.executor.submit(process_task, task): task
            for task in tasks
        }
        
        for future in concurrent.futures.as_completed(future_to_task):
            task = future_to_task[future]
            try:
                result = future.result()
                results.append((task, result))
            except Exception as e:
                results.append((task, f"Error: {str(e)}"))
        
        return results
    
    def performance_monitor(self):
        """性能监控"""
        start_time = time.time()
        
        # 执行任务
        result = self.batch_process(["task1", "task2", "task3"])
        
        end_time = time.time()
        duration = end_time - start_time
        
        # 生成性能报告
        performance_report = {
            "total_tasks": len(result),
            "duration_seconds": duration,
            "avg_time_per_task": duration / len(result),
            "throughput": len(result) / duration,
            "agents_used": len(self.agents),
            "max_workers": self.config.get("max_workers", 4)
        }
        
        return performance_report

2. 内存优化

# memory_optimization.py
from autogen import Agent, UserProxyAgent
from autogen.agentchat import ConversableAgent
from typing import Dict, List, Any
import pickle
import os

class MemoryOptimizedAgent(ConversableAgent):
    """内存优化的 Agent"""
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.message_cache = {}
        self.cache_size = kwargs.get("cache_size", 100)
        self.memory_file = kwargs.get("memory_file", "agent_memory.pkl")
    
    def cache_message(self, message: Dict[str, Any]):
        """缓存消息"""
        message_id = hash(str(message))
        
        # LRU 缓存
        if len(self.message_cache) >= self.cache_size:
            oldest_key = next(iter(self.message_cache))
            del self.message_cache[oldest_key]
        
        self.message_cache[message_id] = message
    
    def get_cached_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
        """获取缓存的消息"""
        message_id = hash(str(message))
        return self.message_cache.get(message_id)
    
    def save_memory_to_disk(self):
        """保存记忆到磁盘"""
        memory_data = {
            "message_cache": self.message_cache,
            "cache_size": self.cache_size
        }
        
        with open(self.memory_file, 'wb') as f:
            pickle.dump(memory_data, f)
    
    def load_memory_from_disk(self):
        """从磁盘加载记忆"""
        if os.path.exists(self.memory_file):
            with open(self.memory_file, 'rb') as f:
                memory_data = pickle.load(f)
                self.message_cache = memory_data["message_cache"]
                self.cache_size = memory_data["cache_size"]
    
    def compress_conversation(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """压缩对话历史"""
        # 使用滑动窗口压缩
        window_size = 5
        compressed_messages = []
        
        for i in range(0, len(messages), window_size):
            window = messages[i:i + window_size]
            
            # 合并相同类型的消息
            if len(window) > 1:
                combined = {
                    "role": "combined",
                    "content": f"[Combined {len(window)} messages]",
                    "timestamp": window[-1]["timestamp"],
                    "original_count": len(window)
                }
                compressed_messages.append(combined)
            else:
                compressed_messages.append(window[0])
        
        return compressed_messages

OpenManus 中文支持增强

1. 中文 Agent 配置

# chinese_openmanus.py
from openmanus import Agent, Team
from typing import Dict, List, Any
import json

class ChineseAgent(Agent):
    """中文 Agent 基类"""
    
    def __init__(self, name: str, role: str, goal: str, **kwargs):
        super().__init__(name, role, goal, **kwargs)
        self.language = "zh-CN"
        self.cultural_context = kwargs.get("cultural_context", {})
    
    def generate_prompt(self, task: str) -> str:
        """生成中文提示"""
        prompt_template = f"""
        你是{self.role},你的目标是{self.goal}。
        
        任务:{task}
        
        请使用中文回答,遵循以下要求:
        1. 使用专业且易懂的语言
        2. 考虑中文表达习惯
        3. 适当使用成语和俗语
        4. 保持逻辑清晰
        """
        
        return prompt_template
    
    def process_response(self, response: str) -> str:
        """处理 Agent 响应"""
        # 中文优化处理
        processed = response
        
        # 修正常见的中文表达问题
        processed = self._correct_chinese_expression(processed)
        
        # 添加适当的标点符号
        processed = self._add_punctuation(processed)
        
        return processed
    
    def _correct_chinese_expression(self, text: str) -> str:
        """修正中文表达"""
        # 这里可以添加中文表达修正逻辑
        corrections = {
            "的": "得",  # 语法修正
            "在": "再",  # 词性修正
        }
        
        for wrong, correct in corrections.items():
            text = text.replace(wrong, correct)
        
        return text
    
    def _add_punctuation(self, text: str) -> str:
        """添加标点符号"""
        # 简单的中文标点添加逻辑
        if not text.endswith(。!?,、;:"''""()【】《》):
            text += "。"
        
        return text

class ChineseResearchAgent(ChineseAgent):
    """中文研究 Agent"""
    
    def __init__(self, **kwargs):
        super().__init__(
            name="research_agent",
            role="研究专家",
            goal="进行深度研究和分析",
            **kwargs
        )
    
    def research_topic(self, topic: str) -> Dict[str, Any]:
        """研究主题"""
        prompt = self.generate_prompt(f"请研究以下主题:{topic}")
        
        # 这里可以调用实际的 LLM
        response = self.llm.invoke(prompt)
        
        processed_response = self.process_response(response.content)
        
        return {
            "topic": topic,
            "research_data": processed_response,
            "language": "zh-CN",
            "timestamp": datetime.now().isoformat()
        }

class ChineseContentAgent(ChineseAgent):
    """中文内容创作 Agent"""
    
    def __init__(self, **kwargs):
        super().__init__(
            name="content_agent",
            role="内容创作者",
            goal="创建高质量的中文内容",
            **kwargs
        )
    
    def create_content(self, topic: str, style: str = "正式") -> str:
        """创建中文内容"""
        prompt = self.generate_prompt(
            f"请创建关于{topic}的{style}风格内容"
        )
        
        response = self.llm.invoke(prompt)
        return self.process_response(response.content)

class ChineseTeam(Team):
    """中文团队"""
    
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.agents = []
    
    def setup_chinese_agents(self):
        """设置中文 Agent"""
        research_agent = ChineseResearchAgent(
            llm_config=self.config.get("llm_config", {})
        )
        
        content_agent = ChineseContentAgent(
            llm_config=self.config.get("llm_config", {})
        )
        
        self.agents = [research_agent, content_agent]
    
    async async_process_topic(self, topic: str):
        """异步处理主题"""
        # 研究阶段
        research_result = await self.agents[0].research_topic(topic)
        
        # 内容创作阶段
        content = await self.agents[1].create_content(
            topic=research_result["topic"],
            style="专业"
        )
        
        return {
            "research": research_result,
            "content": content,
            "language": "zh-CN"
        }

实际应用案例

1. 企业内容创作系统

# enterprise_content_system.py
from langgraph.graph import Graph
from crewai import Agent, Task, Crew
from typing import Dict, Any

class EnterpriseContentSystem:
    """企业内容创作系统"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.langgraph_workflow = None
        self.crewai_crew = None
    
    def setup_workflow(self):
        """设置工作流"""
        # LangGraph 工作流
        langgraph = self.create_langgraph_workflow()
        
        # CrewAI 团队
        crew = self.create_crewai_team()
        
        self.langgraph_workflow = langgraph
        self.crewai_crew = crew
    
    def create_langgraph_workflow(self):
        """创建 LangGraph 工作流"""
        graph = Graph()
        
        # 定义节点
        graph.add_node("research", self.research_node)
        graph.add_node("outline", self.outline_node)
        graph.add_node("draft", self.draft_node)
        graph.add_node("review", self.review_node)
        graph.add_node("publish", self.publish_node)
        
        # 设置流程
        graph.add_edge("research", "outline")
        graph.add_edge("outline", "draft")
        graph.add_edge("draft", "review")
        graph.add_edge("review", "publish")
        
        return graph.compile()
    
    def create_crewai_team(self):
        """创建 CrewAI 团队"""
        # 定义 Agent
        researcher = Agent(
            role="研究专家",
            goal="深入研究主题并提供数据支持",
            backstory="你有10年的行业研究经验",
            tools=[SearchTool(), DataAnalysisTool()],
            verbose=True
        )
        
        writer = Agent(
            role="内容创作者",
            goal="创建高质量、引人入胜的内容",
            backstory="你是一名资深的内容创作者",
            tools=[ContentTool(), SEOTool()],
            verbose=True
        )
        
        editor = Agent(
            role="编辑",
            goal="确保内容质量和符合标准",
            backstory="你是一名专业的内容编辑",
            tools=[QualityTool(), ComplianceTool()],
            verbose=True
        )
        
        # 定义任务
        research_task = Task(
            description="研究主题并收集相关数据",
            agent=researcher,
            expected_output="研究报告"
        )
        
        writing_task = Task(
            description="基于研究报告创建内容",
            agent=writer,
            expected_output="初稿"
        )
        
        editing_task = Task(
            description="编辑和优化内容",
            agent=editor,
            expected_output="最终版本"
        )
        
        return Crew(
            agents=[researcher, writer, editor],
            tasks=[research_task, writing_task, editing_task],
            process=Process.sequential,
            verbose=True
        )
    
    def generate_content(self, request: Dict[str, Any]):
        """生成内容"""
        # 使用 LangGraph 工作流
        langgraph_result = self.langgraph_workflow.invoke({
            "topic": request["topic"],
            "audience": request["audience"],
            "format": request["format"]
        })
        
        # 使用 CrewAI 进行精细处理
        crew_result = self.crewai_crew.kickoff({
            "topic": request["topic"],
            "research_data": langgraph_result["research_data"]
        })
        
        return {
            "langgraph_result": langgraph_result,
            "crewai_result": crew_result,
            "combined": self.combine_results(langgraph_result, crew_result)
        }
    
    def combine_results(self, langgraph: Dict, crewai: Dict) -> Dict:
        """合并结果"""
        return {
            "content": crewai["content"],
            "metadata": {
                "research_source": langgraph["research_data"],
                "created_by": "EnterpriseContentSystem",
                "timestamp": datetime.now().isoformat()
            }
        }

2. 智能客服系统

# intelligent_customer_service.py
from ag2 import Agent, UserProxyAgent
from langgraph.graph import Graph
from typing import Dict, List, Any

class IntelligentCustomerService:
    """智能客服系统"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.agents = {}
        self.workflow = None
        self.knowledge_base = {}
    
    def setup_agents(self):
        """设置 Agent"""
        # 初级客服
        junior_agent = Agent(
            name="junior_agent",
            system_message="你是初级客服,处理简单问题",
            llm_config=self.config["llm_config"]
        )
        
        # 高级客服
        senior_agent = Agent(
            name="senior_agent", 
            system_message="你是高级客服,处理复杂问题",
            llm_config=self.config["llm_config"]
        )
        
        # 技术支持
        tech_agent = Agent(
            name="tech_agent",
            system_message="你是技术支持,处理技术问题",
            llm_config=self.config["llm_config"]
        )
        
        self.agents = {
            "junior": junior_agent,
            "senior": senior_agent,
            "tech": tech_agent
        }
    
    def create_workflow(self):
        """创建工作流"""
        graph = Graph()
        
        # 添加节点
        graph.add_node("classify", self.classify_intent)
        graph.add_node("handle_junior", self.handle_junior_query)
        graph.add_node("handle_senior", self.handle_senior_query)
        graph.add_node("handle_tech", self.handle_tech_query)
        graph.add_node("escalate", self.escalate_query)
        graph.add_node("close", self.close_ticket)
        
        # 设置流程
        graph.add_edge("classify", "handle_junior")
        graph.add_conditional_edges(
            "handle_junior",
            self.junior_decision,
            {
                "escalate": "handle_senior",
                "tech": "handle_tech",
                "close": "close"
            }
        )
        
        return graph.compile()
    
    def classify_intent(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """分类用户意图"""
        message = state["message"]
        
        # 意图分类逻辑
        if any(keyword in message for keyword in ["登录", "账号", "密码"]):
            return {"intent": "account", "agent": "junior"}
        elif any(keyword in message for keyword in ["退款", "订单", "支付"]):
            return {"intent": "order", "agent": "junior"}
        elif any(keyword in message for keyword in ["错误", "故障", "无法"]):
            return {"intent": "technical", "agent": "tech"}
        else:
            return {"intent": "general", "agent": "junior"}
    
    def handle_junior_query(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """处理初级客服查询"""
        user_proxy = UserProxyAgent(name="user_proxy", human_input_mode="NEVER")
        agent = self.agents["junior"]
        
        response = user_proxy.initiate_chat(
            agent,
            message=state["message"],
            clear_history=True
        )
        
        return {"response": response, "resolved": self.is_resolved(response)}
    
    def escalate_query(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """升级查询"""
        user_proxy = UserProxyAgent(name="user_proxy", human_input_mode="NEVER")
        agent = self.agents["senior"]
        
        response = user_proxy.initiate_chat(
            agent,
            message=f"升级查询:{state['message']}",
            clear_history=True
        )
        
        return {"response": response, "resolved": self.is_resolved(response)}
    
    def is_resolved(self, response: str) -> bool:
        """判断是否已解决"""
        positive_indicators = ["已解决", "完成", "处理完毕", "满意"]
        return any(indicator in response for indicator in positive_indicators)

总结

2026 年 Agent 框架的更新带来了以下重要进展:

  1. LangGraph:多 Agent 协作和状态管理增强
  2. CrewAI:企业级工作流和工具集成
  3. AG2:性能优化和内存管理
  4. OpenManus:中文支持完善

选择建议

场景 推荐框架 理由
复杂工作流 LangGraph 强大的状态管理和协作能力
企业应用 CrewAI 企业级特性和工具集成
高性能需求 AG2 优化的性能和内存管理
中文项目 OpenManus 优秀的中文支持
快速开发 LangChain 丰富的预构建组件

最佳实践

  1. 模块化设计:使用 Agent 的组合而非单体
  2. 状态管理:合理使用检查点和状态存储
  3. 工具集成:将现有服务集成为工具
  4. 错误处理:实现完善的错误处理和恢复机制
  5. 性能监控:持续监控和优化性能

记住,选择框架时要考虑具体需求、团队技能和长期维护成本。最佳实践是从小项目开始,逐步扩展功能。

相关链接

常见问题

Agent 框架 2026 最新更新与实践指南 适合什么读者?

想搞清楚 实战 这一块的人,尤其是从概念跨到能动手做的过渡阶段。页面带摘要、同主题延伸和来源链接,你不用再去满网找。

阅读 Agent 框架 2026 最新更新与实践指南 需要多久?

预计 51 分钟。看不下去就跳到结论部分,把感兴趣的小节做个标记,后面再回来。

怎么把 Agent 框架 2026 最新更新与实践指南 用在自己的项目里?

挑一个文章里的最小例子先跑通,跑通之后再拿到自己的项目里。卡住的地方对照来源链接核验细节,部署和评估的部分可以从同主题其他文章补。