QQ BotOneBotNapCatNoneBot2AI AgentNbilityPlaywright

QQ群 AI Bot 进阶:群总结、生图、状态页截图

从 NapCat、OneBot v11、NoneBot2 到模型 API,拆解一个可维护的 QQ 群 AI Bot:每日群总结、图片生成、状态页截图、限流与排障。

QQ群 AI Bot 进阶:群总结、生图、状态页截图

QQ 群 AI Bot 进阶封面

很多 QQ 群机器人教程停在“能回复一句话”这一步。真正放到群里以后,问题会变得更具体:群消息很多,机器人要不要每句话都读?每日总结怎么避免胡编?生图失败怎么提示用户?状态页截图怎么防止被拿去访问内网地址?

这篇文章不追求把所有代码塞成一个大脚本,而是把一个进阶 QQ 群 AI Bot 拆成几层:协议接入、消息路由、群总结、图片生成、状态页截图、权限和成本控制。你可以把它当成一套工程化思路,再按自己的群规模改造成实际服务。

先说清楚:这不是“脚本能跑就行”的项目

QQ 群 Bot 一旦进群,就变成了一个线上服务。线上服务至少要考虑:

  • 连接是否稳定:NapCat、OneBot WebSocket、业务服务是否能自动恢复;
  • 权限是否清晰:谁能让机器人生图、谁能查状态页、谁能改配置;
  • 输出是否可控:群总结不能泄露不该公开的信息,生图失败要有明确反馈;
  • 成本是否可控:群聊全量消息如果都丢给大模型,token 会很快失控;
  • 日志是否可查:出现 403、断线、模型错误、图片发送失败时,能定位到哪一层。

QQ 群 AI Bot 进阶架构

推荐架构:NapCat + OneBot v11 + NoneBot2 + 模型 API

目前个人 QQ 群 Bot 的常见工程链路可以这样理解:

QQ 群消息
  -> NapCat / OneBot v11
  -> 反向 WebSocket
  -> NoneBot2 或自己的 WebSocket 服务
  -> 业务逻辑:答疑、总结、生图、状态页截图
  -> OpenAI-compatible 模型 API
  -> OneBot 消息段发送回 QQ 群

这里每一层负责的事情不同:

  • NapCat:负责登录 QQ、收发消息,并把能力通过 OneBot 的 HTTP / WebSocket 暴露出来。NapCat 文档中建议 NoneBot 场景使用反向 WebSocket,地址通常是 ws://127.0.0.1:8080/onebot/v11/ws;如果是 Docker 部署,需要把 127.0.0.1 换成容器名或实际可达地址。
  • OneBot v11:统一消息、事件、API 的协议标准。它源自 CQHTTP 生态,目标是让不同机器人框架之间尽量复用同一套消息和 API 约定。
  • NoneBot2:负责接收 OneBot 事件、写插件、做命令路由和异步处理。NoneBot 的 OneBot v11 适配器就是面向这类协议接入的。
  • 模型 API 层:负责聊天、总结、多模态理解、图片生成等 AI 能力。这里更推荐使用 OpenAI-compatible 的统一入口,后面换模型或做成本控制会轻松很多。

如果你已经有自己的后端,也可以不使用 NoneBot2,而是直接实现 OneBot WebSocket 客户端/服务端。但对大多数 Python 用户来说,NoneBot2 的插件机制会更省心。

基础接入:先让 QQ 消息稳定进入业务服务

一个最小工程目录可以这样放:

qq-ai-bot/
├── bot.py
├── plugins/
│   ├── __init__.py
│   └── ai_group.py
├── .env
└── requirements.txt

requirements.txt 示例:

nonebot2[fastapi]
nonebot-adapter-onebot
httpx
python-dotenv
playwright

安装依赖:

python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
playwright install chromium

bot.py 示例:

import nonebot
from nonebot.adapters.onebot.v11 import Adapter as ONEBOT_V11Adapter

nonebot.init()
driver = nonebot.get_driver()
driver.register_adapter(ONEBOT_V11Adapter)
nonebot.load_plugins("plugins")

if __name__ == "__main__":
    nonebot.run(host="0.0.0.0", port=8080)

.env 里至少建议放这些配置:

ONEBOT_ACCESS_TOKEN=[REDACTED]
NBILITY_API_KEY=[REDACTED]
NBILITY_BASE_URL=https://api.nbility.dev/v1
NBILITY_CHAT_MODEL=gpt-4o
NBILITY_IMAGE_MODEL=gpt-image-2
BOT_ADMIN_USERS=123456789,987654321
ALLOWED_GROUPS=1234567890

NapCat WebUI 中新增 WebSocket 客户端,URL 填:

ws://你的业务服务地址:8080/onebot/v11/ws

如果 NapCat 和业务服务都在同一台机器上,可以先用 127.0.0.1;如果在 Docker 里,通常要写容器名或宿主机 IP。token 也要保持一致,否则常见现象就是 403。

消息路由:不要让机器人吃掉全群聊天记录

群 Bot 最容易犯的错误,是把所有群消息都拿去调用模型。更稳的方式是分三类:

  1. 显式触发:@机器人、/ask/画图/状态 等命令;
  2. 低成本记录:只保存最近 N 条文本,用于当天群总结;
  3. 忽略消息:表情、刷屏、过短消息、重复消息、机器人自己的消息。

一个简化的 NoneBot 插件可以这样写:

# plugins/ai_group.py
import os
import time
from collections import defaultdict, deque

import httpx
from nonebot import on_command, on_message
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, Message, MessageSegment
from nonebot.params import CommandArg

BASE_URL = os.getenv("NBILITY_BASE_URL", "https://api.nbility.dev/v1")
API_KEY = os.getenv("NBILITY_API_KEY", "[REDACTED]")
CHAT_MODEL = os.getenv("NBILITY_CHAT_MODEL", "gpt-4o")
ALLOWED_GROUPS = {int(x) for x in os.getenv("ALLOWED_GROUPS", "").split(",") if x.strip()}

recent_messages: dict[int, deque[str]] = defaultdict(lambda: deque(maxlen=300))
last_call_at: dict[tuple[int, str], float] = {}

async def call_chat(messages, *, temperature=0.3):
    async with httpx.AsyncClient(timeout=60) as client:
        r = await client.post(
            f"{BASE_URL}/chat/completions",
            headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"},
            json={"model": CHAT_MODEL, "messages": messages, "temperature": temperature},
        )
        r.raise_for_status()
        return r.json()["choices"][0]["message"]["content"]

def allowed(event: GroupMessageEvent) -> bool:
    return not ALLOWED_GROUPS or event.group_id in ALLOWED_GROUPS

def rate_limited(group_id: int, key: str, seconds: int = 10) -> bool:
    now = time.time()
    k = (group_id, key)
    if now - last_call_at.get(k, 0) < seconds:
        return True
    last_call_at[k] = now
    return False

collector = on_message(priority=99, block=False)

@collector.handle()
async def collect(event: GroupMessageEvent):
    if not allowed(event):
        return
    text = event.get_plaintext().strip()
    if len(text) >= 4 and not text.startswith("/"):
        recent_messages[event.group_id].append(f"用户{event.user_id}: {text}")

这段代码只做一件事:低成本保存最近消息。真正调用模型的地方放在命令里,不要在全量消息监听里直接请求大模型。

群答疑:只在明确触发时调用模型

ask = on_command("ask", aliases={"问", "提问"}, priority=10, block=True)

@ask.handle()
async def handle_ask(bot: Bot, event: GroupMessageEvent, args: Message = CommandArg()):
    if not allowed(event):
        return
    question = args.extract_plain_text().strip()
    if not question:
        await ask.finish("用法:/ask 你的问题")
    if rate_limited(event.group_id, f"ask:{event.user_id}", 8):
        await ask.finish("先等几秒,我还在处理上一条。")

    context = "
".join(list(recent_messages[event.group_id])[-30:])
    answer = await call_chat([
        {"role": "system", "content": "你是 QQ 群里的技术助手。回答要简洁、准确;不确定时说明不确定。"},
        {"role": "user", "content": f"最近群聊上下文:
{context}

用户问题:{question}"},
    ])
    await ask.finish(answer[:1800])

这里有几个细节:

  • 只取最近 30 条上下文,避免 token 爆炸;
  • 每个用户加简单冷却;
  • 输出截断到 QQ 群里比较容易阅读的长度;
  • system prompt 里要求“不确定就说不确定”,减少乱答。

每日群总结:不要把流水账当总结

群总结的关键不是“把所有消息都塞给模型”,而是先做筛选:

  • 太短的水群消息可以丢掉;
  • 重复内容只保留一次;
  • 总结里要区分“事实”“待办”“争议/未确认信息”;
  • 如果当天有效消息太少,就不要硬发总结。
summary = on_command("summary", aliases={"总结", "群总结"}, priority=10, block=True)

@summary.handle()
async def handle_summary(event: GroupMessageEvent):
    if not allowed(event):
        return
    lines = list(recent_messages[event.group_id])
    useful = [x for x in lines if len(x) >= 10]
    if len(useful) < 8:
        await summary.finish("今天可总结的信息还不多,先不生成流水账。")

    content = "
".join(useful[-180:])
    result = await call_chat([
        {"role": "system", "content": "你负责生成 QQ 群每日总结。只根据输入内容总结,不要补充外部事实。"},
        {"role": "user", "content": f"请按以下结构输出:
1. 今日重点
2. 已确认信息
3. 待办事项
4. 争议或未确认内容
5. 值得回看的问题

群聊记录:
{content}"},
    ], temperature=0.2)
    await summary.finish(result[:2500])

如果要做真正的每日自动推送,可以把 recent_messages 换成 SQLite / PostgreSQL,并用 APScheduler、cron 或系统定时器在每天固定时间调用总结逻辑。生产环境不建议只放在内存里,因为服务重启会丢消息。

AI 生图:把失败处理写在第一版里

群里的生图功能很容易变成“最受欢迎,也最容易出问题”的功能。建议一开始就做四件事:

  • 命令触发:例如 /画图 一只在服务器旁工作的猫
  • 每人冷却:比如 30–60 秒;
  • 提示词安全检查:拒绝明显违规或涉及隐私的内容;
  • 失败返回原因:不要只说“出错了”。

调用 OpenAI-compatible 图片接口时,建议把图片模型也放在统一配置里。如果你已经用 Nbility 作为模型入口,聊天和生图可以共用同一套 API Key、Base URL、用量记录和错误处理逻辑:

async def create_image(prompt: str) -> str:
    image_model = os.getenv("NBILITY_IMAGE_MODEL", "gpt-image-2")
    async with httpx.AsyncClient(timeout=180) as client:
        r = await client.post(
            f"{BASE_URL}/images/generations",
            headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"},
            json={"model": image_model, "prompt": prompt, "size": "1024x1024"},
        )
        if r.status_code >= 400:
            raise RuntimeError(f"image api failed: {r.status_code} {r.text[:300]}")
        data = r.json()
        return data["data"][0].get("url") or data["data"][0].get("b64_json")

命令处理:

draw = on_command("draw", aliases={"画图", "生图"}, priority=10, block=True)

@draw.handle()
async def handle_draw(event: GroupMessageEvent, args: Message = CommandArg()):
    if not allowed(event):
        return
    prompt = args.extract_plain_text().strip()
    if not prompt:
        await draw.finish("用法:/画图 一张橙黑色科技感猫咪机器人海报")
    if len(prompt) > 300:
        await draw.finish("提示词太长了,先控制在 300 字以内。")
    if rate_limited(event.group_id, f"draw:{event.user_id}", 45):
        await draw.finish("生图比较耗时,先等一会儿再试。")

    try:
        url_or_b64 = await create_image(prompt)
    except Exception as e:
        await draw.finish(f"这次图片生成失败:{str(e)[:160]}
可以换一种更具体、少敏感词的描述再试。")

    if url_or_b64.startswith("http"):
        await draw.finish(MessageSegment.image(url_or_b64))
    await draw.finish("图片已生成,但当前示例没有处理 base64 落盘逻辑;生产环境请保存为本地文件后发送。")

实际部署时,图片发送可能受网络、文件大小、协议实现影响。更稳的做法是:图片生成后先落盘或上传到可访问的 CDN,再通过 OneBot 图片消息发送;同时把原图链接也返回给用户,避免 QQ 图片转发失败时用户拿不到结果。

状态页截图:Playwright 很好用,但要限制 URL

“帮我截一下状态页”是群 Bot 很实用的功能,比如服务器状态、API 状态、服务健康页。Playwright 官方文档里提供了 page.screenshot(path="screenshot.png")full_page=True 的截图方式,Python 里可以直接使用。

但这个功能必须加 URL 白名单,否则用户可能让机器人访问内网地址、云元数据地址或管理后台。推荐只允许固定域名:

from pathlib import Path
from urllib.parse import urlparse
from playwright.async_api import async_playwright

ALLOWED_STATUS_HOSTS = {"status.example.com", "status.nbility.dev"}
SCREENSHOT_DIR = Path("/tmp/qq-bot-screenshots")
SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True)

async def screenshot_status(url: str) -> Path:
    parsed = urlparse(url)
    if parsed.scheme not in {"https", "http"} or parsed.hostname not in ALLOWED_STATUS_HOSTS:
        raise ValueError("这个 URL 不在允许截图的白名单里。")

    out = SCREENSHOT_DIR / f"status-{int(time.time())}.png"
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        page = await browser.new_page(viewport={"width": 1440, "height": 1000})
        await page.goto(url, wait_until="networkidle", timeout=20000)
        await page.screenshot(path=str(out), full_page=True)
        await browser.close()
    return out

命令示例:

status = on_command("status", aliases={"状态页", "api状态"}, priority=10, block=True)

@status.handle()
async def handle_status(event: GroupMessageEvent, args: Message = CommandArg()):
    url = args.extract_plain_text().strip() or "https://status.nbility.dev"
    try:
        path = await screenshot_status(url)
    except Exception as e:
        await status.finish(f"截图失败:{str(e)[:180]}")
    await status.finish(MessageSegment.image(path.as_uri()))

有些 OneBot 实现对 file://、本地路径、HTTP URL 的支持细节不同。如果本地文件发不出去,可以改成:先上传到自己的静态目录,再发送 https://.../status.png。同时建议给截图结果加 30–120 秒缓存,不要每个用户一问就打开浏览器截图。

systemd:让 Bot 像服务一样运行

生产环境不要靠 SSH 窗口长期挂着。可以用 systemd:

# /etc/systemd/system/qq-ai-bot.service
[Unit]
Description=QQ AI Bot Service
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
WorkingDirectory=/opt/qq-ai-bot
EnvironmentFile=/etc/qq-ai-bot.env
ExecStart=/opt/qq-ai-bot/.venv/bin/python bot.py
Restart=always
RestartSec=5
User=qqbot
Group=qqbot

[Install]
WantedBy=multi-user.target

启动:

sudo systemctl daemon-reload
sudo systemctl enable --now qq-ai-bot
sudo systemctl status qq-ai-bot --no-pager
journalctl -u qq-ai-bot -f

如果 NapCat 也是独立服务,也要同样设置自动重启。Bot 进阶功能越多,越应该把“自动恢复”和“日志可查”放到第一优先级。

QQ 群 Bot 上线检查清单

常见问题排查

1. NapCat 连接 NoneBot 返回 403

优先检查 token。NapCat 和 NoneBot 的 ONEBOT_ACCESS_TOKEN 必须一致;如果一边为空、一边不为空,也会失败。还要确认 NapCat 配的是反向 WebSocket 地址,路径一般是 /onebot/v11/ws

2. Docker 里写 127.0.0.1 连接不上

容器里的 127.0.0.1 指向容器自己,不是宿主机,也不是另一个容器。把地址改成同一 Docker 网络下的服务名,例如:

ws://qq-ai-bot:8080/onebot/v11/ws

或者使用宿主机实际内网 IP。

3. QQ 群图片发不出去

先确认 OneBot 实现支持的图片消息格式。有些场景支持 URL,有些更适合本地文件,有些需要先上传到可公网访问的静态目录。建议日志里记录图片来源、文件大小、发送 API 响应,避免只看到“发送失败”。

4. 群总结看起来像流水账

通常是输入太脏或 prompt 太泛。先过滤短消息、重复消息和表情,再让模型按“重点、事实、待办、争议、问题”结构输出。消息太少时宁可不发。

5. 生图失败率高

常见原因包括提示词过长、内容触发安全策略、图片模型暂时繁忙、网络超时。建议:

  • 用户侧提示“换一种更具体、少敏感词的描述”;
  • 服务侧保留错误码和响应摘要;
  • 同一请求最多重试一次;
  • 给每个用户和群设置冷却时间。

6. 模型调用费用不可控

优先做这几件事:

  • 默认只响应 @ 和命令,不处理所有闲聊;
  • 群总结先压缩上下文,再调用大模型;
  • 生图单独设置更长冷却;
  • 按群、按用户、按功能记录调用次数;
  • 使用统一模型网关查看用量和错误日志。

一个更稳的上线顺序

如果你是第一次做 QQ 群 AI Bot,不建议一口气打开所有功能。更稳的顺序是:

  1. 先跑通 NapCat 和 NoneBot2 的连接,只做 /ping
  2. /ask,只允许管理员或测试群使用;
  3. 加群消息缓存和手动 /summary
  4. 总结质量稳定后,再做每日定时推送;
  5. /status 截图,但只允许白名单 URL;
  6. 最后加 /draw 生图,并设置冷却、失败提示和用量统计。

这样每一层出问题都能单独定位,不会变成一个巨大的黑盒。

小结

QQ群 AI Bot 的进阶点,不在于“多接一个模型”,而在于把群聊场景当作真实线上服务来设计:协议层稳定连接,业务层明确命令和权限,模型层统一管理聊天、视觉和生图能力,运维层保留日志、限流和兜底。

NapCat、OneBot v11、NoneBot2 负责把 QQ 群消息可靠地送到你的业务逻辑里;Playwright 可以补上状态页截图这类实用能力;像 Nbility 这样的 OpenAI-compatible API 入口,则适合放在模型层,统一处理聊天、生图、用量和后续换模型的需求。先把这些边界拆清楚,再往群里加功能,Bot 才会更像一个可维护的产品,而不是一个随时会坏的玩具脚本。

参考链接

相关文章

用 Nbility 跑通你的 Agent 工作流

获取 API Key,统一接入 OpenAI 兼容模型和开发工具。

管理 API Key