← 回總覽

【OpenClaw】通过 Nanobot 源码学习架构---(2)外层控制逻辑

📅 2026-04-01 21:16 罗西的思考 人工智能 27 分鐘 33132 字 評分: 86
Nanobot OpenClaw AI Agent 架构设计 源码分析
📌 一句话摘要 本文深入剖析了轻量级 AI Agent 框架 Nanobot 的外层控制架构,详细解读了 Commands、Gateway 和 Channel 三大核心组件的设计原理与协作机制。 📝 详细摘要 本文是 Nanobot 源码学习系列的第二篇,重点分析了系统的外层控制逻辑。作者通过拆解 Commands(CLI 入口)、Gateway(系统协调中心)和 Channel(多平台适配器)三个核心组件,展示了如何利用 Typer、asyncio 和消息总线(MessageBus)构建一个解耦、高效且易于扩展的 AI Agent 框架。文章不仅提供了架构层面的逻辑解析,还结合代码片段说

【OpenClaw】通过Nanobot源码学习架构---(2)外层控制逻辑

* 0x00 概要

* 0x01 Commands

* 1.1 逻辑意义

* 1.2 CLI 框架

* 1.3 主要命令类别

* 0x02 Gateway

* 2.1 核心作用

* 2.2 图例

* 2.3 代码

* 0x03 Channel

* 3.1 需求

* 3.2 OpenClaw

* 3.3 基类

* 3.4 QQChannel

* 0xFF 参考

0x00 概要

OpenClaw 应该有40万行代码,阅读理解起来难度过大,因此,本系列通过Nanobot来学习 OpenClaw 的特色。 Nanobot是由香港大学数据科学实验室(HKUDS)开源的超轻量级个人 AI 助手框架,定位为"Ultra-Lightweight OpenClaw"。非常适合学习Agent架构。

在 Nanobot 中,如下几个主要组件担任了外层控制架构,各组件职责为:

* Commands(commands.py):

* nanobot 应用程序的统一命令行入口

* Commands 调用 Agent 完成具体工作

* Commands 是手动触发器。可以将 Command 想象成手动变速箱,何时换挡由用户掌控。

* Gateway:

* 是 commands.py 中的一部分,由 Command 启动

* 系统入口,协调各组件启动和运行

* 启动MessageBus、AgentLoop、ChannelManager

* 协调 CronService 和 HeartbeatService

* Channel(如QQchannel):

* 接收外部消息

* 将消息发布到.MessageBus

* 发送响应消息回外部平台

* MessageBus:

* 解耦 Channel 和 AgentLoop

* 提供异步消息队列机制

我们本篇来分析这几个组件(MessageBus不做单独介绍,而是穿插在其它组件中)。

注:本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。

0x01 Commands

commands.py 是 nanobot 应用程序的统一命令行入口,它定义了所有可用的命令行命令和选项,使用户可以通过终端与 nanobot 进行交互,它集成了以下核心功能:

* 系统初始化:配置和工作区设置

* 服务管理:启动和管理整个 nanobot 系统

* 用户交互:提供直观的命令行和交互式界面

* 资源管理:定时任务、通信渠道和提供商管理

* 状态监控:系统健康状况检查

1.1 逻辑意义

Command 是用户通过输入 /command-name 手动触发的指令。

#### 1.1.1 Commands vs Agent

Commands 和 Agent 是系统中的不同角色,总体来说,Commands 实质上是“触发器”,决定某件事“何时(手动)”运行。Agents 则是拥有独立上下文和工具的“执行者”,决定了“做”什么。

* Commands = 手动触发(由用户决定何时执行),即刻显式运行Agents;

* Agents = 执行者(真正干活的),完成具体工作。Agents 是具备独立上下文、工具和指令的专业执行者。但是,Agent 不会自行启动,需要由 Command、Skill 或 LLM 根据需求来调用。

#### 1.1.2 Commands 运行场景

使用 Command 的场景为:

* 任务简单,且不需要隔离上下文。

* 用户希望对多步骤工作流保有手动控制权。

* 这个操作会产生某些后果,而用户希望在这些后果发生之前,先由用户自己确认。

* 用户会在特定时机反复执行的工作流程,而用户希望在自己认为合适的那一刻手动启动它。

Command 本身就是完整的工作流。整个系统的协作方式如下:

* 用户输入 /XXX(Command - 手动触发)

* Command 运行,Command 告知 Claude:“调用 XXX agent”

* Agent 加载自己的上下文和工具

* Agent 完成具体工作

* Agent 返回结果

* 用户获得可操作的报告

1.2 Typer 库

Nanobot中,Typer 库的作用如下:

* 应用程序初始化:使用 Typer 库创建命令行应用程序框架。Typer 框架的作用是提供一个结构化的命令行接口定义和参数解析系统,使开发者能够轻松创建具有多层命令结构的复杂 CLI 应用程序。所有命令通过共享的配置文件系统协同工作,不受终端会话限制。

* 全局命令选项:定义通用选项如版本信息(--version,-v)。

* 独立进程模型:每个cLI命令都在独立的操作系统进程中运行,nanobot onboard、nanobot gateway、nanobot agent分别对应不同的进程。每个命令都是独立的入口点,通过命令行参数触发,执行不同命令的运行模式。

* 命令分类

* 初始化类命令(如onboard):执行一次性配置任务:创建配置目录、生成默认配置文件、初始化工作区;执行完成后立即退出,不启动任何长期运行的服务; 示例:nanobot onboard → 创建~/.nan obot/config·json → 程序退出

* 服务类命令(如gateway):启动长期运行的主服务进程; 初始化并启动所有核心组件:MessageBus、Agen tLoop、ChannelManager等;持续运行直到手动停止,处理来自各渠道的消息;示例:nanobot gateway → 启动webSocket连接 → 持续监听消息 → 长期运行

* 交互类命令(如agent):启动独立的CLI交互会话,创建独立的AgentLoop实例,在当前终端提供交互式对话,可与gateway并行运行 app = typer.Typer( name="nanobot", help=f"{__logo__} nanobot - Personal AI Assistant", no_args_is_help=True, )

1.3 主要命令

#### 1.3.1 系统设置命令 @app.command("onboard")

其功能如下:

* 创建配置文件(~/.nanobot/config.json)

* 初始化工作区目录

* 创建默认模板文件(AGENTS.md, SOUL.md, USER.md, TOOLS.md, IDENTITY.md)

* 创建内存目录结构

#### 1.3.2 服务启动命令 @app.command("gateway")

其功能如下:

* 启动网关服务

* 启动消息总线(MessageBus)

* 创建 LLM 提供商实例

* 初始化 AgentLoop

* 设置 CronService(定时任务)

* 配置 HeartbeatService(心跳服务)

* 启动所有启用的通信渠道 ,连接多个通道处理逻辑

* 运行异步事件循环

#### 1.3.3 直接交互命令 @app.command("agent")

agent()是CLI的核心实现,是nanobotagent命令的具体函数实现,是nanobot系统的交互式对话入口点。

##### 处理逻辑

* 创建 MessageBus 和 AgentLoop 实例

* 提供直接的命令行交互界面,与Agent直接交互参数:

message:发送给代理的消息

session_id:会话ID

markdown:是否染为Markdown

logs:是否显示运行日志 * 根据是否有消息参数决定是单次调用还是交互模式

* 单次模式:直接调用agent_loop.process_direct(),即直接发送消息并接收回复

* 交互模式:通过MessageBus与AgentLoop通信,提供持续对话界面

* 使用 prompt_toolkit 处理输入

* 支持历史记录和多行粘贴

* 渲染 Markdown 格式的回复

##### 与Gateway模式的区别

* 交互方式

* CLI模式:直接终端输入/输出,单用户交互

* Gateway模式:通过多平台Channe1接 收消息,支持多用户并发

* 消息路由

* CLI模式:绕过MessageBus的标准路由机制,直接本地处理

* Gateway模式:严格遵循Channel→MessageBus→AgentLoop→MessageBus→Channel的完整流程总结

#### 1.3.4 通信渠道管理命令 @channels_app.command("status")

其功能如下:

* 显示所有渠道的状态(WhatsApp, Discord, Feishu, Mochat, Telegram, Slack, DingTalk, QQ, Email)

* 显示配置详情

* 指示是否启用

#### 1.3.5 通道登录命令 @channels_app.command("login")

其功能如下:

* 设置并启动设备桥接(bridge)

* 处理二维码扫描认证流程

* 支持 WhatsApp 等需要设备连接的服务

#### 1.3.6 定时任务管理命令 @cron_app.command("list")

* 显示所有定时任务

* 显示任务ID、名称、调度计划、状态和下次运行时间

#### 1.3.7 添加命令 @cron_app.command("add")

* 添加新的定时任务

* 支持多种调度模式(every, cron, at)

* 验证时区设置

#### 1.3.8 删除命令 @cron_app.command("remove")

* 删除指定ID的定时任务

#### 1.3.9 启用/或禁用命令 @cron_app.command("enable/disable")

* 启用或禁用定时任务

#### 1.3.10 手动执行命令 @cron_app.command("run")

* 手动执行指定的定时任务

#### 1.3.11 状态查询命令 @app.command("status")

* 检查配置文件是否存在

* 验证工作区路径

* 显示当前模型配置

* 检查各提供商的API密钥状态

#### 1.3.12 OAuth 认证命令 @provider_app.command("login") * 支持 OpenAI Codex OAuth 登录

* 支持 GitHub Copilot 设备流认证

* 动态注册和处理不同提供商的认证流程

0x02 Gateway

对于 Nanobot 来说,gateway()函数是 Nanobot 网关的启动入口,负责初始化并串联所有核心组件(消息总线、Agent 循环、通道管理、定时任务、心跳服务等),仅通过轻量化的代码组织就完成了 OpenClaw 同等核心的 “多通道交互 + 定时任务 + 心跳检测” 能力,体现了 Nanobot “3000 行 Python 实现同等核心能力” 的极致轻量化设计理念。

Gateway 并不是主要响应用户命令的组件,它的作用更像是一个消息路由中心和协调器,Gateway 启动和协调所有服务,不直接处理业务逻辑,而是启动整个系统; 所有的消息 (包括 CLI 输入和第三方应用消息)都通过MessageBus路由到统一的AgentLoop进行处理。

以下是OpenClaw 的 Gateway架构。

!Image 1: gateway-架构

gateway-架构

以下是 Nanbobot 的整体架构。可以看出来 Gateway 的作用。

!Image 2: 总体架构-1

总体架构-1

2.1 核心作用

#### 2.1.1 整体作用 gateway函数是 Nanobot 的 “中枢神经”,通过初始化并串联AgentLoop(核心执行)、MessageBus(通信)、CronService(定时)、HeartbeatService(心跳)、ChannelManager(通道)等组件,完成了从 “配置加载” 到 “组件启停” 的全生命周期管理。

* 初始化全局配置、消息总线、AI 模型提供商、会话管理器等基础组件;

* 构建 Agent 核心执行循环(AgentLoop),集成定时任务(CronService)、Shell 执行、MCP 服务等核心能力;

* 配置定时任务回调逻辑,让 CronJob 能通过 Agent 执行并推送结果;

* 初始化通道管理器(ChannelManager),支持多渠道(CLI / 第三方平台)交互;

* 实现心跳服务(HeartbeatService),定期执行预设任务并推送结果;

* 统一管理所有组件的启动 / 停止生命周期,保证系统优雅启停。

#### 2.1.2 核心特色

gateway 的特色如下:

  • 组件化解耦 + 按需串联:所有核心能力(Agent、Cron、Heartbeat、Channels)均为独立组件,通过消息总线(MessageBus)和回调函数解耦,仅在网关层完成串联,既保证模块化,又简化交互逻辑。
  • 轻量化生命周期管理:通过 asyncio 协程统一管理所有异步组件的启动 / 停止,捕获 KeyboardInterrupt 实现优雅关机,无冗余的生命周期框架,符合 3000 行代码的轻量化定位。
  • 灵活的任务分发机制:Cron 任务和心跳任务均通过 Agent 统一执行,结果可推送至指定通道(CLI / 第三方平台),兼顾 “通用执行逻辑” 和 “个性化推送”。
  • 容错与降级设计:心跳任务的通道选择有明确的降级逻辑(优先最近会话→兜底 CLI),避免因通道不可用导致服务异常,提升系统稳定性。
  • 配置驱动:所有核心参数(端口、模型、超时、心跳间隔)均从配置文件加载,无需硬编码,适配不同部署场景。

2.2 图例

#### async run() 服务启动顺序

run是gateway的主运行函数。 async def run(): try: await cron.start() await heartbeat.start() await asyncio.gather( agent.run(), channels.start_all(), ) except KeyboardInterrupt: console.print("\nShutting down...") finally: await agent.close_mcp() heartbeat.stop() cron.stop() agent.stop() await channels.stop_all()

具体图例如下:

!Image 3: gateway-启动

gateway-启动

#### gateway() 内部组件依赖关系

!Image 4: gateway() 内部组件依赖关系

gateway() 内部组件依赖关系

2.3 代码

gateway函数是 Nanobot 的核心启动入口,承担 “组件初始化 + 生命周期管理 + 核心能力串联” 的核心职责。 @app.command() def gateway( port: int = typer.Option(18790, "--port", "-p", help="Gateway port"), verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"), ): """Start the nanobot gateway.""" # 延迟导入核心模块(避免启动时加载冗余依赖,提升启动速度) from nanobot.config.loader import load_config, get_data_dir  # 配置加载/数据目录工具 from nanobot.bus.queue import MessageBus  # 消息总线:组件间通信核心 from nanobot.agent.loop import AgentLoop  # Agent核心执行循环:处理用户指令/工具调用 from nanobot.channels.manager import ChannelManager  # 通道管理器:管理CLI/第三方平台等交互通道 from nanobot.session.manager import SessionManager  # 会话管理器:存储/管理用户会话 from nanobot.cron.service import CronService  # 定时任务服务:管理CronJob的调度与执行 from nanobot.cron.types import CronJob  # CronJob数据类型:定义定时任务结构 from nanobot.heartbeat.service import HeartbeatService  # 心跳服务:定期执行预设任务 # 若开启详细输出模式,配置日志为DEBUG级别(便于调试) if verbose: import logging logging.basicConfig(level=logging.DEBUG) # 在控制台打印启动信息,包含logo和网关端口 console.print(f"{__logo__} Starting nanobot gateway on port {port}...") # 加载全局配置文件(所有组件的配置均从这里读取) config = load_config() # 初始化消息总线:作为所有组件间异步通信的核心枢纽 bus = MessageBus() # 基于配置创建AI模型提供商(如OpenAI/火山方舟等,适配不同LLM) provider = _make_provider(config) # 初始化会话管理器:基于配置的工作目录存储用户会话数据 session_manager = SessionManager(config.workspace_path) # 先创建定时任务服务(回调函数需在Agent创建后设置,此处先初始化存储) # 定义cron任务的持久化存储路径:数据目录/cron/jobs.json cron_store_path = get_data_dir() / "cron" / "jobs.json" # 初始化CronService,指定任务存储路径 cron = CronService(cron_store_path) # 创建Agent核心执行循环:集成所有核心能力的核心组件 agent = AgentLoop( bus=bus,  # 关联消息总线,用于接收/发送事件 provider=provider,  # 关联AI模型提供商,用于LLM推理 workspace=config.workspace_path,  # 指定Agent的工作目录(存储文件/任务数据) model=config.agents.defaults.model,  # 默认使用的LLM模型(如doubao-seed-lite) temperature=config.agents.defaults.temperature,  # LLM温度参数(控制生成随机性) max_tokens=config.agents.defaults.max_tokens,  # LLM生成的最大token数 max_iterations=config.agents.defaults.max_tool_iterations,  # 工具调用的最大迭代次数(防止死循环) memory_window=config.agents.defaults.memory_window,  # Agent记忆窗口(保留最近的会话轮次) brave_api_key=config.tools.web.search.api_key or None,  # 网页搜索工具的API Key(可选) exec_config=config.tools.exec,  # Shell执行工具的配置(超时/安全规则等) cron_service=cron,  # 关联定时任务服务,让Agent能访问定时任务 restrict_to_workspace=config.tools.restrict_to_workspace,  # 是否限制文件操作仅在工作目录(安全管控) session_manager=session_manager,  # 关联会话管理器,用于读写用户会话 mcp_servers=config.tools.mcp_servers,  # MCP服务配置(多模态/扩展工具) channels_config=config.channels,  # 通道配置(支持的交互渠道) ) # 设置定时任务的执行回调函数(需在Agent创建后,因依赖Agent执行逻辑) async def on_cron_job(job: CronJob) -> str | None: """Execute a cron job through the agent. 核心逻辑:将定时任务的消息传递给Agent执行,支持结果推送至指定通道 """ # 调用Agent的直接处理方法执行定时任务消息 response = await agent.process_direct( job.payload.message,  # 定时任务的核心消息(如"检查GitHub星数") session_key=f"cron:{job.id}", # 为定时任务创建独立会话Key(避免干扰用户会话) channel=job.payload.channel or "cli",  # 任务关联的通道(默认CLI) chat_id=job.payload.to or "direct",  # 任务推送的目标ChatID(默认direct) ) # 若任务配置了"推送结果"且指定了目标,则通过消息总线推送执行结果 if job.payload.deliver and job.payload.to: from nanobot.bus.events import OutboundMessage  # 导入出站消息事件类型 await bus.publish_outbound(OutboundMessage( channel=job.payload.channel or "cli",  # 推送的目标通道 chat_id=job.payload.to,  # 推送的目标ChatID content=response or ""  # 推送的内容(Agent执行结果) )) # 返回Agent执行结果(供CronService记录) return response # 将回调函数绑定到CronService,定时任务触发时自动执行 cron.on_job = on_cron_job # 初始化通道管理器:管理所有启用的交互通道(CLI/微信/钉钉等) channels = ChannelManager(config, bus) def _pick_heartbeat_target() -> tuple[str, str]: """Pick a routable channel/chat target for heartbeat-triggered messages. 核心逻辑:为心跳任务选择合适的推送通道(优先最近活跃的非内部会话) """ # 获取所有启用的通道集合 enabled = set(channels.enabled_channels) # 优先选择最近更新的非内部会话对应的启用通道(保证推送至活跃用户) for item in session_manager.list_sessions(): key = item.get("key") or "" if ":" not in key:  # 会话Key格式应为"channel:chat_id",无分隔符则跳过 continue channel, chat_id = key.split(":", 1)  # 拆分通道和ChatID if channel in {"cli", "system"}:  # 跳过CLI/系统内部通道(非用户交互通道) continue if channel in enabled and chat_id:  # 通道已启用且有有效ChatID return channel, chat_id # 降级策略:无合适通道时兜底使用CLI通道(保持原有行为,逻辑显式化) return "cli", "direct" # 初始化心跳服务的核心回调函数:执行心跳任务 async def on_heartbeat_execute(tasks: str) -> str: """Phase 2: execute heartbeat tasks through the full agent loop. 核心逻辑:通过Agent完整执行循环处理心跳任务,选择合适的通道执行 """ # 选择心跳任务的执行/推送目标通道 channel, chat_id = _pick_heartbeat_target() # 定义空的进度回调函数(心跳任务无需输出进度) async def _silent(*_args, **_kwargs): pass # 调用Agent直接处理方法执行心跳任务 return await agent.process_direct( tasks,  # 心跳任务的具体内容(如"检查系统状态") session_key="heartbeat",  # 心跳任务专属会话Key channel=channel,  # 执行任务的通道 chat_id=chat_id,  # 执行任务的ChatID on_progress=_silent,  # 禁用进度回调 ) # 初始化心跳服务的通知回调函数:推送心跳任务执行结果 async def on_heartbeat_notify(response: str) -> None: """Deliver a heartbeat response to the user's channel. 核心逻辑:将心跳任务执行结果推送至用户的活跃通道(跳过CLI通道) """ from nanobot.bus.events import OutboundMessage  # 导入出站消息事件类型 # 选择推送目标通道 channel, chat_id = _pick_heartbeat_target() if channel == "cli": return  # CLI通道无外部用户,无需推送 # 通过消息总线推送心跳任务结果 await bus.publish_outbound(OutboundMessage(channel=channel, chat_id=chat_id, content=response)) # 读取心跳服务配置 hb_cfg = config.gateway.heartbeat # 初始化心跳服务 heartbeat = HeartbeatService( workspace=config.workspace_path,  # 工作目录(存储心跳任务配置) provider=provider,  # AI模型提供商(用于生成/执行心跳任务) model=agent.model,  # 使用与Agent相同的LLM模型 on_execute=on_heartbeat_execute,  # 心跳任务执行回调 on_notify=on_heartbeat_notify,  # 心跳任务结果推送回调 interval_s=hb_cfg.interval_s,  # 心跳任务执行间隔(秒) enabled=hb_cfg.enabled,  # 是否启用心跳服务 ) # 打印通道启用状态(可视化启动信息) if channels.enabled_channels: console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}") else: console.print("[yellow]Warning: No channels enabled[/yellow]") # 打印定时任务状态(可视化启动信息) cron_status = cron.status() if cron_status["jobs"] > 0: console.print(f"[green]✓[/green] Cron: {cron_status['jobs']} scheduled jobs") # 打印心跳服务状态(可视化启动信息) console.print(f"[green]✓[/green] Heartbeat: every {hb_cfg.interval_s}s") # 定义网关核心运行函数:统一启动/停止所有异步组件 async def run(): try: # 启动定时任务服务 await cron.start() # 启动心跳服务 await heartbeat.start() # 并发启动Agent核心循环和所有启用的通道 await asyncio.gather( agent.run(),  # 启动Agent执行循环(处理指令/工具调用) channels.start_all(),  # 启动所有启用的通道(监听用户输入) ) except KeyboardInterrupt: # 捕获用户中断(Ctrl+C),打印关机提示 console.print("\nShutting down...") finally: # 优雅关闭所有组件(无论正常退出还是异常) await agent.close_mcp()  # 关闭MCP服务连接 heartbeat.stop()  # 停止心跳服务 cron.stop()  # 停止定时任务服务 agent.stop()  # 停止Agent执行循环 await channels.stop_all()  # 停止所有通道 # 启动网关主循环(阻塞直到退出) asyncio.run(run())

0x03 Channel

3.1 需求

让大语言模型能调用工具确实很强大,但如果只能通过命令行和它交流,那使用场景就太受限了。理想的情况是:你能直接用语音跟它说话,它也能用自然的方式回应你,并且整个对话就发生在你平时常用的聊天软件里——比如 Telegram、WhatsApp、Discord 或 Slack。这就是 Channel 要解决的问题。

Channel 本质上是个适配器。它把你的智能代理(Agent)连接到各种即时通讯平台。不同平台的消息格式各不相同,Channel 的作用就是把这些五花八门的格式统一转换成 Agent 能理解的标准输入;等 Agent 生成回复后,再把标准输出转回对应平台能接受的格式发出去。

为了不让消息处理拖慢接收速度,Channel 和 Agent 之间通常会加一个消息总线(Message Bus)——说白了就是一个队列。新消息进来先存进队列,Agent 慢慢从队列里取走处理。这样即使 LLM 思考得慢,也不会阻塞新消息的接收。 用户输入 → Channel → MessageBus → AgentLoop → ContextBuilder → LLM Tool Execution MessageBus → Channel → 用户输出

3.2 OpenClaw

我们首先看看 OpenClaw 的 Channel。

用户可以通过不同的IM软件(飞书、Whatsapp、Telegram等)发送消息给OpenClaw,其中每个IM软件都视为一个Channel。

入站消息怎么找到对应 agent:通道收到消息后,会调用路由层(src/routing/resolve-route.ts):

  • 输入维度:channel + accountId + peer(群/私聊) + teamId/guildId
  • 匹配bindings
  • 命中后返回agentId + sessionKey
  • 没命中走默认 agent
出站消息统一发送的流程如下:
  • 先拿到目标通道插件getChannelPlugin(channel)
  • 检查该插件是否实现outbound.sendText/sendMedia
  • 统一做切片/分段(超长文本)
  • 最终调用具体通道的发送实现

3.3 基类

nanobot 会在 gateway 中同时启动多个通道,通过 ChannelManager 统一管理所有通道。其他通道继承相同的基类 BaseChannel,实现相同的生命周期方法,使用统一的消息格式。 class BaseChannel(ABC): """ Abstract base class for chat channel implementations. Each channel (Telegram, Discord, etc.) should implement this interface to integrate with the nanobot message bus. """ name: str = "base" def __init__(self, config: Any, bus: MessageBus): """ Initialize the channel. Args: config: Channel-specific configuration. bus: The message bus for communication. """ self.config = config self.bus = bus self._running = False @abstractmethod async def start(self) -> None: """ Start the channel and begin listening for messages. This should be a long-running async task that: 1. Connects to the chat platform 2. Listens for incoming messages 3. Forwards messages to the bus via _handle_message() """ pass @abstractmethod async def stop(self) -> None: """Stop the channel and clean up resources.""" pass @abstractmethod async def send(self, msg: OutboundMessage) -> None: """ Send a message through this channel. Args: msg: The message to send. """ pass def is_allowed(self, sender_id: str) -> bool: """ Check if a sender is allowed to use this bot. Args: sender_id: The sender's identifier. Returns: True if allowed, False otherwise. """ allow_list = getattr(self.config, "allow_from", []) # If no allow list, allow everyone if not allow_list: return True sender_str = str(sender_id) if sender_str in allow_list: return True if "|" in sender_str: for part in sender_str.split("|"): if part and part in allow_list: return True return False async def _handle_message( self, sender_id: str, chat_id: str, content: str, media: list[str] | None = None, metadata: dict[str, Any] | None = None, session_key: str | None = None, ) -> None: """ Handle an incoming message from the chat platform. This method checks permissions and forwards to the bus. Args: sender_id: The sender's identifier. chat_id: The chat/channel identifier. content: Message text content. media: Optional list of media URLs. metadata: Optional channel-specific metadata. session_key: Optional session key override (e.g. thread-scoped sessions). """ if not self.is_allowed(sender_id): logger.warning( "Access denied for sender {} on channel {}. " "Add them to allowFrom list in config to grant access.", sender_id, self.name, ) return msg = InboundMessage( channel=self.name, sender_id=str(sender_id), chat_id=str(chat_id), content=content, media=media or [], metadata=metadata or {}, session_key_override=session_key, ) await self.bus.publish_inbound(msg) @property def is_running(self) -> bool: """Check if the channel is running.""" return self._running

3.4 QQChannel

QQChannel 是 nanobot 与 QQ 平台进行通信的桥梁,实现了 QQ 私聊消息的接收和发送功能,使用 botpy SDK 通过 WebSocket 连接 QQ 服务器。即,通过标准化接口与其他组件协作,使得 nanobot 能够通过 QQ 与用户交互。

#### 3.4.1 架构设计

实际上,QQChannel 实现了一个QQ机器人,QQChannel 继承 BaseChannel,依赖 botpy SDK 实现 QQ 通信,通过 MessageBus 与 AgentLoop 交互(利用 MessageBus 进行异步消息传递),是「QQ 平台 ↔ Nanobot 核心」的桥梁。

QQChannel 既不是独立的线程也不是独立的进程,而是在主进程中通过异步事件循环协作运行的一个组件。

* 单进程异步模型:QQchannel与其他组件共享同一个进程和事件循环非阻塞协作:通过async/await实现协作式并发

* 事件驱动:botpySDK通过回调机制处理QQ消息事件

* 消息总线:通过MessageBus解耦通道和代理处理逻辑

* 继承结构:继承自 BaseChannel,遵循统一的通道接口,实现了标准的 start、stop、send 方法

* 自动重连机制:WebSocket 连接异常断开时,自动等待 5 秒后重连,保证 QQ 机器人的长连接稳定性;

* 消息去重处理:通过固定长度的双端队列记录已处理消息 ID,避免重复处理相同消息;

* 鲁棒的异常处理:对消息收发、连接管理等关键流程做异常捕获,记录日志且不影响整体服务运行;

* 轻量的资源管理:使用deque限制已处理消息 ID 的存储长度(最大 1000),避免内存泄漏;

* 配置校验前置:启动前校验 SDK 是否安装、app_id/secret 是否配置,提前暴露配置问题;

* C2C 消息适配:专注处理 QQ 单聊(C2C)消息,适配个人用户与机器人的交互场景。

##### 与 MessageBus 交互

MessageBus的解耦作用如下:

* Inbound 流向:QQChannel.publish_inbound() → MessageBus.inbound_queue → AgentLoop.consume_inbound()

* Outbound流向:AgentLoop.publish_outbound() → MessageBus.outbound_queue → Channel.send() await self._handle_message( sender_id=user_id, # QQ 用户 ID chat_id=user_id, # 对话 ID(私聊中与用户 ID 相同) content=content, # 消息内容 metadata={"message_id": data.id}, # 元数据 )

##### 与 AgentLoop 交互

上下文传递

* 通过 channel 名称("qq")和 chat_id(QQ 用户 ID)识别会话

* 保持会话状态和上下文连续性

##### 与 SessionManager 交互

会话管理

* 通过 channel:chat_id 格式创建会话键(qq:user_openid)

* 每个 QQ 用户拥有独立的会话历史,每个会话通过session_key(通常是channel:chat_id)标识

* 利用 SessionManager 管理会话状态,保证同一用户的连续对话体验

会话键格式

* QQ 会话使用 "qq:openid" 格式作为唯一标识

* 每个 QQ 用户拥有独立的会话历史和上下文

工具上下文设置

* AgentLoop._set_tool_context() 设置当前会话的上下文

* 确保 MessageTool、CronTool 等工具知道当前的 channel 和 chat_id

##### 生命周期管理

统一管理

* ChannelManager.start_all() 和 stop_all() 方法统一控制所有通道

* Gateway 关闭时确保 QQ 通道正确清理资源

资源清理

* QQChannel.stop() 方法关闭 WebSocket 连接

* 清理内部状态和缓存

#### 3.4.2 消息交互机制

QQChannel 的功能为:

##### 消息通道

* 处理 QQ 用户发送的消息

* 将消息转换为内部格式并发送到消息总线

* 发送 nanobot 的回复到指定 QQ 用户

* 在 _run_bot 方法中实现自动重连机制

* 异常时等待 5 秒后重新连接

其消息流转路径为 用户发送 QQ 消息 → QQChannel接收 → MessageBus 入站队列 → AgentLoop 处理  → AI 处理 →  发布响应  → MessageBus 出站队列 → QQChannel 发送 → 用户接收

消息流转机制具体如下:

##### 启动流程

Gateway 启动流程如下,在 gateway 命令中 async def run() 会启动各个通道。 await cron.start() await heartbeat.start() await asyncio.gather(agent.run(),channels.start_all) # 会启动所有配置的通道

ChannelManager 会启动 QQChannel

* 从配置文件加载 QQ 通道配置(QQConfig)

* 验证 App ID 和密钥是否配置正确

* ChannelManager 创建 QQChannel 实例

* 传入共享的 MessageBus 实例 await qq_channel.start() # 调用 QQChannel.start()

最终如下: 主进程(gateway) ├─ 主事件循环(asyncio) │ ├─ AgentLoop.run() #主代理循环 │   ├─ ChannelManager.start_all() │   │    ├─ QQchannel.start() #QQ通道启动 │   │    │ └─ _run_bot() # botpy SDK连接循环 │   │    ├─ 其他通道.start()#如 Telegram,Discord 等#定时任务服务 │   │    └─ CronService.start() # 定时任务服务 │   └─ HeartbeatService.start)#心跳服务

##### 消息接收流程

* QQ 用户发送消息到机器人

* botpy SDK 接收到消息事件

* 触发 _on_message 回调方法

在消息接收流程中,消息处理与转发细节如下:

* 消息过滤:_on_message 验证消息唯一性,防止重复处理

* 使用 deque 维护最近处理的消息 ID

* 最大保留 1000 条记录防止重复处理

* 检查消息 ID 避免重复处理

* 验证消息内容是否为空

* 提取发送者信息和内容

* 将 QQ 消息转换为 InboundMessage 格式,设置正确的 channel("qq")和 chat_id(用户 ID) # 消息总线传输 await self._handle_message( sender_id=user_id, # QQ 用户 ID chat_id=user_id, # 对话 ID(私聊中与用户 ID 相同) content=content, # 消息内容 metadata={"message_id": data.id}, # 元数据 )

消息总线传输

* BaseChannel._handle_message 将消息发布到 MessageBus 的入站队列

* MessageBus 负责异步消息传递

##### 消息处理流程

Agent 接收消息

* AgentLoop.run() 从 MessageBus 消费消息

* 调用 _dispatch() 处理消息

会话管理

* 根据 channel="qq" 和 chat_id(QQ 用户 ID)创建唯一的会话键

* 使用 SessionManager 管理对话历史

消息处理

* AgentLoop._process_message() 执行完整的 AI 处理循环

* 调用 LLM,执行工具,生成响应

##### 响应发送流程

Agent处理完消息之后,响应发送流程如下:

响应生成

* AgentLoop 生成响应后,将其包装为 OutboundMessage

* 发布到 MessageBus 的出站队列

消息路由

* ChannelManager 监听出站消息队列

* 根据 channel 字段将消息路由到对应的通道

QQ 消息发送

* QQChannel.send() 方法被调用

* 使用 botpy API 将消息发送给 QQ 用户: await self._client.api.post_c2c_message( openid=msg.chat_id, # QQ 用户 ID msg_type=0, # 消息类型 content=msg.content, # 消息内容 )

##### 特殊处理

特殊处理如下:

* 停止:标记状态 → 关闭连接 → 日志记录;

* 任务取消

* 支持/stop命令中断当前处理任务

* 通过asyncio.Task.cancel()实现优雅停止

* 错误恢复

* Channel连接断开后自动重连

* 消息处理失败时返回错误信息而非崩溃

#### 3.4.3 QQChannel 核心模块逻辑关系图

!Image 5: Channel-1

Channel-1

#### 3.4.4 QQChannel 核心流程流程图

!Image 6: Channel-2

Channel-2

#### 3.4.5 代码 def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]": """Create a botpy Client subclass bound to the given channel.""" intents = botpy.Intents(public_messages=True, direct_message=True) class _Bot(botpy.Client): def __init__(self): super().__init__(intents=intents) async def on_ready(self): logger.info("QQ bot ready: {}", self.robot.name) async def on_c2c_message_create(self, message: "C2CMessage"): await channel._on_message(message) async def on_direct_message_create(self, message): await channel._on_message(message) return _Bot class QQChannel(BaseChannel): """QQ channel using botpy SDK with WebSocket connection.""" # 渠道名称(唯一标识,供框架识别) name = "qq" def __init__(self, config: QQConfig, bus: MessageBus): # 调用父类初始化(配置+消息总线) super().__init__(config, bus) # 类型注解:限定config为QQConfig类型(便于类型检查) self.config: QQConfig = config # QQ机器人客户端实例(懒初始化,start时创建) self._client: "BotClient | None" = None # 已处理消息ID队列:双端队列,最大长度1000(用于消息去重) self._processed_ids: deque = deque(maxlen=1000) async def start(self) -> None: """Start the QQ bot.""" # 校验1:QQ SDK是否安装(未安装则记录错误并返回) if not QQ_AVAILABLE: logger.error("QQ SDK not installed. Run: pip install qq-botpy") return # 校验2:配置是否完整(app_id/secret是必填项) if not self.config.app_id or not self.config.secret: logger.error("QQ app_id and secret not configured") return # 标记渠道为运行状态 self._running = True # 创建自定义Bot类(绑定消息回调到当前QQChannel实例) BotClass = _make_bot_class(self) # 初始化Bot客户端实例 self._client = BotClass() # 记录启动日志(标识支持C2C私聊) logger.info("QQ bot started (C2C private message)") # 启动Bot连接(带自动重连) await self._run_bot() async def _run_bot(self) -> None: """Run the bot connection with auto-reconnect.""" # 运行循环:只要渠道处于运行状态,就持续尝试连接 while self._running: try: # 启动Bot WebSocket连接(使用配置的app_id/secret) await self._client.start(appid=self.config.app_id, secret=self.config.secret) except Exception as e: # 连接异常时记录警告日志(不终止循环,后续重连) logger.warning("QQ bot error: {}", e) # 渠道仍在运行时,等待5秒后重连 if self._running: logger.info("Reconnecting QQ bot in 5 seconds...") await asyncio.sleep(5) async def stop(self) -> None: """Stop the QQ bot.""" # 标记渠道为停止状态(终止重连循环) self._running = False # 关闭Bot客户端连接(若已初始化) if self._client: try: await self._client.close() except Exception: # 关闭异常时静默处理(避免影响服务停止) pass # 记录停止日志 logger.info("QQ bot stopped") async def send(self, msg: OutboundMessage) -> None: """Send a message through QQ.""" # 客户端未初始化时记录警告并返回 if not self._client: logger.warning("QQ client not initialized") return try: # 调用QQ API发送C2C私聊消息 await self._client.api.post_c2c_message( openid=msg.chat_id,  # 接收者openid(对应QQ用户ID) msg_type=0,          # 消息类型:0=文本消息 content=msg.content, # 消息内容 ) except Exception as e: # 发送失败时记录错误日志 logger.error("Error sending QQ message: {}", e) async def _on_message(self, data: "C2CMessage") -> None: """Handle incoming message from QQ.""" try: # 消息去重:已处理的消息ID直接返回 if data.id in self._processed_ids: return # 将当前消息ID加入已处理队列(自动淘汰超1000条的旧ID) self._processed_ids.append(data.id) # 提取消息发送者信息(兼容不同版本SDK的字段名) author = data.author user_id = str(getattr(author, 'id', None) or getattr(author, 'user_openid', 'unknown')) # 提取消息内容并去除首尾空白符 content = (data.content or "").strip() # 空消息直接忽略 if not content: return # 调用父类的消息处理方法(转发到消息总线/AgentLoop) await self._handle_message( sender_id=user_id,          # 发送者ID chat_id=user_id,            # 聊天ID(C2C场景下等于发送者ID) content=content,            # 消息内容 metadata={"message_id": data.id},  # 附加元数据(消息ID) ) except Exception: # 消息处理异常时记录完整堆栈(便于排查问题) logger.exception("Error handling QQ message")

0xFF 参考

打造你的 Claw 帝国:OpenClaw底层原理揭秘! OpenClaw 之后,AI 应用该怎么设计? 李宏毅OpenClaw技术全面解析:System Prompt驱动的身份构建 + 工具链递归调用 + HEARTBEAT主动心跳,AI Agent架构运作原理深度拆解 我给 OpenClaw 杀了 47 次僵尸进程,终于想明白了一些事 万字】带你实现一个Agent(上),从Tools、MCP到Skills 3500 行代码打造轻量级AI Agent:Nanobot 架构深度解析 Kimi Agent产品很厉害,然后呢?

OpenClaw真完整解说:架构与智能体内核 https://github.com/shareAI-lab/learn-claude-code 深入理解OpenClaw技术架构与实现原理(上) 深度解析:一张图拆解OpenClaw的Agent核心设计 OpenClaw小龙虾架构全面解析 OpenClaw架构-Agent Runtime 运行时深度拆解 OpenClaw 架构详解 · 第一部分:控制平面、会话管理与事件循环 从回答问题到替你做事,AI Agent 为什么突然火了?

查看原文 → 發佈: 2026-04-01 21:16:00 收錄: 2026-04-02 00:00:34

🤖 問 AI

針對這篇文章提問,AI 會根據文章內容回答。按 Ctrl+Enter 送出。