患者 AI 消息预警平台 — 技术方案

· v1.0 · 2026-06-30
把「采集库里的患者体征 → 丢给 AI 研判 → 把风险预警实时推给医护」这条链路,做成一个独立、可靠、可追溯的后台平台。支持多条产品线,一期接入**重症(ICU)和麻醉(ANES)**两条产品线。
文中关键参数(查库周期、AI 并发/超时、容量等)附默认值和测算过程。

一、为什么单独建一个项目

体征数据已经由采集系统写进了数据库。我们要做的是中间这段:定时取数 → 调 AI → 拿结果 → 推送前端,并保证全程不丢、可查、可统计。

平台不做两件事:不做体征采集入库(采集系统已有),不做 AI 模型(AI 团队提供 HTTP 接口)。

采集库由其他部门维护,我们只有只读权限,不能建表、写数据。研判时读采集库取体征;推送时只读本地业务库——Worker 写 alert_record 时把姓名、床号、room/ward 等字段快照进去,WebSocket/REST 不再查采集库。患者主数据不做全量同步。

为什么单独建项目,而不是塞进现有系统:

理由 说明
职责干净 只负责「预警编排 + 推送」,边界清晰
故障隔离 定时查库、调慢 AI、推送都在独立进程,不拖累业务系统
独立伸缩 患者多、AI 慢时,单独给本平台加实例即可
松耦合 和采集库(只读)、AI(HTTP)、前端(WebSocket/REST)对接,各方独立升级、互不改代码
可复用 重症、麻醉共用一套,后续急诊等产品线按同模式接入

二、整体架构:平台在链路里的角色

整个平台夹在采集域和 AI/前端之间,只负责中间的编排和推送

   采集域(别人的)              本平台(我们做的)                 外部能力(别人的)
 ┌──────────────┐   ①只读查库    ┌──────────────────────────┐
 │ 采集系统      │ ───────────▶  │  ②查库 → ③编排 → ④调AI    │  ④HTTP   ┌──────────┐
 │ 写入体征      │              │  → ⑤落库 → ⑥推送          │ ───────▶ │ AI 服务   │
 └──────┬───────┘              │                          │ ◀─────── │ (重症/麻醉)│
        │ 写入                 │  全程写库:pipeline/日志/  │   返回    └──────────┘
        ▼                      │  预警/通知 → 可追溯        │
 ┌──────────────┐  平台主动查    │                          │  ⑥WS+REST ┌──────────┐
 │ MySQL 采集库  │ ◀──────────  │                          │ ───────▶ │ 业务前端  │
 │ (ICU/ANES)   │              └──────────────────────────┘          │ 弹窗/列表 │
 └──────────────┘                                                     └──────────┘

前端通过 WebSocket(实时)+ REST(兜底拉取)拿预警。

平台内部分五个环节,状态都落库,便于排查和补数据:

环节 干什么 关键技术
②查库取数 定时按水位增量查活跃患者的新体征 XXL-Job + MyBatis-Plus + 只读多数据源
③任务编排 把研判任务可靠地放进队列 Outbox + RabbitMQ
④AI 研判 取一段时间体征调 AI,拿风险结果 Worker 消费 + WebClient
⑤预警落库 写预警记录、通知记录 MySQL(本地事务)
⑥消息推送 实时推医护,断线可补 WebSocket + REST

三、一条预警是怎么跑完的

1. 采集系统把体征写进采集库                                (平台不参与)
2. XXL-Job 定时触发(默认每 3 分钟,可配)
3. 查库:窗口 = [last_watermark, now](上限 window_minutes),只查新增体征
   —— 若该患者 AI 正在跑(running 锁)→ 跳过入队,标记 pending_reanalysis
   —— 否则同一事务写 pipeline + 日志 + outbox 并提交(❌ 不更新水位)
4. 投递:Outbox 把任务发到 RabbitMQ(ICU / ANES 各自的队列)
5. 研判:Worker 取任务,汇总体征,HTTP 调 AI(带 requestId=pipelineId,5~30 秒)
6. 落库:AI 成功 → 同事务写 record + notice + 更新水位 + pipeline 完成
7. 推送:WebSocket 按 push_route_key(room/ward)广播;REST + wsToken 兜底
8. 兜底:AI 失败水位不变自动重试 → 仍失败进死信 → 对账/人工补偿补跑
   —— running 锁释放后若 pending_reanalysis=true → 立即补一次分析

整条链路里,唯一的"慢"在 AI(5~30 秒),查库和推送都是毫秒~秒级。


四、技术选型(为什么用它们)

技术 用途 选型理由 量化依据
Spring Boot 3 平台主体 Java 栈成熟,独立后台首选 4 个模块(common/entity/biz/server),单体 1~2 实例即可
MySQL 8 业务库 + 只读采集库 采集库本就是 MySQL,统一运维 业务库 9 张表,日增流水线约 2.8 万行,单表千万级前无需分库
MyBatis-Plus 数据访问 多数据源 + 增量 SQL 完全可控 单患者增量查询走联合索引,命中行数 = 一个窗口内体征条数(几十行),单次 <10ms
RabbitMQ 3.12+ 内部任务队列 任务队列语义 + 原生重试死信(见下方测算) 峰值入队 ~1 条/秒,远低于单节点万级 TPS 能力
Redis 7 缓存 / 防抖 / 锁 防重复入队、缓存活跃患者 键约 200 个,QPS < 50
XXL-Job 定时调度 可视化调度 + 失败告警 查库 Job 周期 180s、对账 Job 900~1800s,各产品线独立
WebClient 调 AI 响应式 HTTP,超时/并发/熔断可控 每产品线并发上限 5、读超时 30s(可配)
WebSocket 实时推送 服务端主动推,毫秒级 内网推送 <1s,对比 5s 轮询最坏延迟降低约 80%
Sa-Token / JWT 鉴权 独立鉴权,可对接 SSO 一期 WS 用 License→wsToken;二期 SSO 按用户推送

为什么是 RabbitMQ(带容量测算)

一条消息对应一个患者、一个时间窗、一次 AI 研判,属于任务队列,不是日志流。量级如下:

活跃患者          ICU 100 + ANES 100        = 200 人
查库周期          poll_interval = 180s       → 每天 480 轮
每轮有新数据占比  经验取 30%                 → 每轮入队 200 × 30% = 60 条
─────────────────────────────────────────────────────────
平均入队速率      60 条 / 180s              ≈ 0.33 条/秒
日峰(×3 突发)   ≈ 1 条/秒
日任务总量        60 × 480                  ≈ 2.88 万条/天
真正瓶颈          AI 单次 5~30s,并发 5      → 单线约 10~60 条/分钟

瓶颈在 AI 吞吐(每分钟几十条),不在 MQ。日入队约 1 条/秒,主流 MQ 都够用;选型主要看任务语义、重试死信、分线路由和运维成本。

维度 我们的需求 RabbitMQ ✅ Kafka RocketMQ Redis Streams
消息语义 一条任务一个 Worker 竞争消费,语义直接 分区日志流,偏流处理 队列+Topic,可用 消费者组,可用
吞吐需求 ~1 条/秒 万级 TPS 百万级 十万级 万级
重试/死信 强需求(AI 偶发失败) TTL + DLX 原生支持 需自研补偿 延迟消息+DLQ 无原生 DLQ,要自研
分线路由 ICU/ANES 隔离 direct exchange,一行配置 靠分区规划 Tag 过滤 单 Stream,隔离弱
可靠投递 Outbox+Confirm+ACK Publisher Confirm 成熟 acks 成熟 事务消息 无 Confirm,弱
运维成本 内网、小团队 单节点可跑,管理台友好 依赖 ZK/KRaft,偏重 NameServer,中等 复用现有 Redis,最轻

为什么不是 Kafka:我们用不上 Kafka 的百万 TPS 和回放能力,却要维护 ZK/KRaft;任务队列所需的「一条一消费 + 死信重试」在 Kafka 上要自己实现。
为什么不是 Redis Streams:缺原生 DLQ 和 Publisher Confirm,可靠投递要补较多逻辑。
选 RabbitMQ:任务语义清晰,重试/死信/分线路由现成,Spring AMQP 集成成熟,单节点能撑一期。

为什么不用线程池代替 MQ:日任务约 2.8 万条,线程池也能跑。选 MQ 主要是因为失败重试、死信、多 Worker 扩缩和任务持久化,而不是 TPS:

能力 线程池 RabbitMQ
失败重试 / 死信 需自研 原生 TTL + DLX
多 Worker 水平扩展 难协调 竞争消费,加实例即可
查库与 AI 解耦 同进程耦合 进程/实例可独立扩缩
任务状态持久化 内存丢失 持久化队列 + Outbox
积压可观测 管理台看队列深度

Redis 用途(缓存 / 防抖 / 锁)

Redis 在平台做四件事,(任务队列是 RabbitMQ):

用途 怎么做 解决什么
活跃患者缓存 alert:active:{产品线},TTL 1~2 分钟 少查采集库
患者防抖 SET alert:debounce:{患者键} 1 NX EX 30,已存在就跳过本轮入队 同患者 30s 内不重复入队
入队分布式锁 SET alert:lock:{幂等键} {实例号} NX EX 60 多实例不重复建 Pipeline
查库段锁 SET alert:fetch:{产品线}:{patientKey} {实例号} NX EX 10 保护「读水位 → 判是否入队 → 写 outbox / 改 pending」
AI 运行态锁 SET alert:running:{productLine}:{patientKey} 1 NX EX 300 同一患者同一时刻只有一个 AI 任务在跑
解锁用 Lua 比对 value 后 DEL;DB 上 idempotency_key 唯一索引再挡一层重复。

五、核心机制详解

5.0 防 AI 重复分析:运行态锁 + 延迟补跑

AI 分析进行中(如 10:00 开始,耗时 30s),若 10:06 查库又来新体征,会再入队,导致同一患者被两个 Worker 同时跑。处理办法:

10:00  Worker 开始分析 P001 → SET alert:running:ICU:P001 = 1 (TTL 300s)
10:05  采集库又来新体征
10:06  查库 Job 扫描到 P001 有新数据
       → 检查 alert:running:ICU:P001 存在 → 跳过入队,不写 outbox
       → 标记 alert_patient_watermark.pending_reanalysis = true(有新数据待补)
10:00:30  AI1 完成 → EVAL Lua(DEL running + 读 pending)→ 若 pending 则发延迟补跑(TTL 45s,is_reanalysis=true)
10:01:15  补跑 Worker 消费 → 新建 pipeline + outbox → 清除 pending
机制 作用
alert:running:{patientKey} Redis 锁 同一患者同一时刻最多 1 个 AI 任务
pending_reanalysis 标记 AI 跑着时新来的数据不丢,完成后经延迟消息补跑
idempotency_key 唯一索引 重复 outbox / 重复消息只处理一次
debounce 30s 减少无效入队

多实例并发(查库 Job + Worker 同时跑)

部署多实例时,查库 Job 和 Worker 可能同时改 alert_patient_watermark 和 Redis,需要额外约束:

竞态 风险 处理
多个查库实例同时发现同一患者有新数据 可能各写一条 outbox(幂等键不同则 pipeline 重复) 每患者抢 alert:fetch:{产品线}:{patientKey}(EX 10s),持锁才执行「读水位 → 判断 → 写 outbox」;idempotency_key 唯一索引兜底
running 期间写 pending_reanalysis 与 Worker 完成时的清除并发,可能漏标或重复补 标 pending 与查库判断放在同一 fetch 锁内;或 watermark 行 SELECT … FOR UPDATE
AI 完成:删 running + 查 pending + 补跑 非原子会漏补或重复补 Redis EVAL Lua;pending=true 时发延迟 MQ(TTL 45s,is_reanalysis=true),Worker 异步消费
补跑消息与正常查库 Job 重叠 重复 pipeline、无效日志 is_reanalysis 标记 + 45s 内查库跳过 + debounce(见「补跑与查库去重」)
-- fetch 阶段(持 alert:fetch 锁)
IF running 存在 THEN
  UPDATE watermark SET pending_reanalysis=1
  跳过 outbox
ELSE
  IF 该患者 45s 内已有 is_reanalysis pipeline 在处理/刚完成 THEN 跳过
  写 pipeline + outbox
END

-- AI 完成(Redis EVAL Lua,见下)

Lua 完成脚本与 fallback

Lua 通过 EVAL 执行,脚本内不抛 Java 异常;返回固定字符串供 Worker 判断:

返回值 含义
success running 已删,无 pending
pending_cleared running 已删,pending 已清,已触发延迟补跑
no_pending running 已删,pending 本来就是 0
-- KEYS[1]=running  KEYS[2]=pending  ARGV[1]=pipelineId
local v = redis.call('GET', KEYS[1])
if v then redis.call('DEL', KEYS[1]) end
local p = redis.call('GET', KEYS[2])
if p == '1' then
  redis.call('DEL', KEYS[2])
  return 'pending_cleared'   -- Worker 侧发 MQ 延迟消息
end
return v and 'success' or 'no_pending'

Worker 侧对 Lua 调用设超时(如 500ms)。失败或超时时 fallback:按 pipelineId 校验 value 后 GET + DEL running;再读 DB pending_reanalysis,为 true 则同样发延迟补跑消息。running 锁 TTL 300s 可自愈,但 fallback 避免等满 TTL 才补跑。

监控:alert_running_lock_countalert:running:* 键数量)、Lua 失败/超时次数。

补跑与查库去重

延迟补跑走 alert.reanalysis.delay(默认 TTL 45s,可配;比查库周期 180s 短、比 5s 更能错开查库 Job)。消息体带 is_reanalysis=truepipeline 表同步写 is_reanalysis=1

环节 去重
查库 Job(持 fetch 锁) 若该患者 45s 内存在 is_reanalysis=1 且 pipeline 状态 ∈ {CREATED,…,AI_RUNNING,DONE},跳过本次入队
补跑 Worker 消费前再查 running 锁;idempotency_keyreanalysis 前缀
debounce 补跑 pipeline 与正常 pipeline 共用 30s debounce(可配)

5.0.1 水位(Watermark)什么时候更新?

查到了 10:01、10:02、10:03 的数据后 AI 失败,水位该不该动?更新了可能丢数,不更新下轮又重复查。规则如下:

水位只在 AI 成功且 alert_record 落库之后,与 record 同一事务内更新。

查库阶段(事务1):
  读采集库 record_time > watermark
  写 pipeline + log + outbox
  ❌ 不更新 watermark

AI 成功阶段(事务2):
  BEGIN
    写 alert_record + alert_notice
    更新 pipeline → DONE
    UPDATE alert_patient_watermark SET last_data_time = window_end  ← 只在这里更新
  COMMIT

AI 失败:
  watermark 不变 → 下轮对账/重试仍覆盖同一段数据 → 不丢
时机 是否更新 watermark 原因
查库发现新数据 还没分析完,不能认为"已处理"
入队 / 投递 MQ 任务可能失败
AI 成功 + record 落库 这段数据确实分析完了
AI 失败 / 超时 下轮重试或对账会再覆盖

5.0.2 数据窗口怎么算?

不是固定「最近 30 分钟」,而是动态区间:

window_start = last_data_time(水位,上次成功处理到的时刻)
window_end   = 当前查库时刻(或该患者采集库 MAX(record_time))
  • 配置项 window_minutes上限:若 window_end - window_start > 30min,只取最近 30min(防 AI 输入过大)。
  • 正常情况窗口是 [last_watermark, now]不重叠、不重复——上次成功到哪,这次就从哪开始。

示例:10:00 成功分析到 10:00 → 10:06 查库 window = [10:00, 10:06],不会重复分析 10:00 之前的数据。

5.0.3 事务边界(两阶段本地事务)

┌─ 事务1(查库 Job 线程)─────────────────────────────┐
│  BEGIN                                               │
│    INSERT alert_pipeline (CREATED)                   │
│    INSERT alert_pipeline_log (DATA_FETCH)            │
│    INSERT alert_outbox (PENDING)                     │
│  COMMIT                                              │
└──────────────────────────────────────────────────────┘
         ↓ Outbox Relay(事务外,~1s 轮询)
         ↓ RabbitMQ Confirm
┌─ Worker 消费(无事务,调 AI 5~30s)──────────────────┐
│  HTTP 调 AI(带 requestId = pipelineId,AI 端幂等)  │
└──────────────────────────────────────────────────────┘
         ↓ AI 返回成功
┌─ 事务2(Worker 线程)─────────────────────────────────┐
│  BEGIN                                               │
│    INSERT alert_record + alert_notice                │
│    UPDATE alert_pipeline → DONE                      │
│    UPDATE alert_patient_watermark.last_data_time     │
│  COMMIT                                              │
│  basicAck(MQ)                                        │
└──────────────────────────────────────────────────────┘
         ↓ 异步
         WebSocket 广播推送 / REST 兜底

事务失败与回滚

  • 事务1、事务2均使用 @Transactional(rollbackFor = Exception.class),任一步异常整笔回滚。
  • 事务2失败时:alert_record / 水位均不提交alert_pipelineFAILEDalert_pipeline_log 记异常;MQ 消息 nack 进重试。
  • idempotency_key 或 record 唯一索引冲突:视为重复消费,记日志后直接 ACK,不重复落库。
  • 事务2成功后才 basicAck;AI 调通但落库失败时不确认 MQ,保证可重试。

5.0.4 AI 超时与幂等

AI 侧其实已成功,但网络超时未收到响应时,平台若直接重试会导致重复分析。处理如下:

措施 说明
请求带 requestId 值 = pipelineId,AI 侧按 requestId 幂等
结果查询接口 AI 提供 GET /result/{requestId}(联调约定);Worker 超时后先查结果,已有则直接落库,没有再重试
平台 Consumer 幂等 idempotency_key 唯一索引,重复消息 ACK 跳过
Resilience4j 每产品线独立熔断:连续失败熔断 5min,标 AI_UNAVAILABLE

5.0.5 AI 限流与降级

并发上限由 MQ 排队 + Worker prefetch + Resilience4j Bulkhead(每线默认 5)控制。任务突增时先进队列,不会同时打满 AI。

层级 机制 默认
MQ 任务先进队列排队 削峰,不丢
Worker prefetch 限制未 ACK 数 5~10
AI 调用 Resilience4j Bulkhead(信号量) 每线并发 5
AI 调用 RateLimiter 可配 QPS 上限
AI 挂了 CircuitBreaker 熔断 5min 任务积压 MQ,查库继续、入队继续
AI 恢复 熔断关闭 Worker 继续消费积压队列
AI 不可用时的行为:
  查库 Job     → 正常跑(不受影响)
  Outbox/MQ    → 正常入队(任务积压队列)
  Worker       → 熔断打开,暂停调 AI,消息留在队列
  AI 恢复      → 熔断关闭,Worker 逐步消费积压
  对账 Job     → 继续跑,发现漏判补跑

5.0.6 Outbox 轮询与投递确认

一期用 Polling Publisher(约 1s 轮询 alert_outbox WHERE status=PENDING):

方案 一期 说明
Polling 采用 日 PENDING 量低,1s 轮询可接受
CDC(Canal / Debezium) 二期 任务量或延迟要求上来后再考虑

状态变更顺序(避免「标记已发但 MQ 未持久化」):

Relay 取 PENDING 行
  → 发 RabbitMQ(persistent + mandatory)
  → 阻塞等待 Publisher Confirm(broker ack)
  → Confirm 成功后才 UPDATE outbox SET status=PUBLISHED
  → Confirm 失败或超时:保持 PENDING,下轮重发

补偿 Job(XXL-Job,每 1~5 分钟):扫描 status=PENDING AND create_time < now()-30s 的记录,强制重发并打告警。单条 outbox 累计重试 不超过 10 次retry_count 字段),超限标 FAILED 并告警,避免对 MQ 死循环打。

禁止在收到 Confirm 之前把 outbox 标为 PUBLISHED。

5.0.7 WebSocket 推送策略(一期 License / 二期 SSO)

现有客户端没有用户登录,用 AiLicense(license + mac) 做设备级鉴权,与麻醉/重症 AI 客户端一致。一期按终端 / 房间 / 病区推送,不按个人——和 License 绑 room 的模型一致。

阶段 推送方式 鉴权 说明
一期 产品线 + 房间/病区 广播 License + mac → wsToken 同一 room/ward 下所有在线终端收到相同预警
二期 用户 精准推送 SSO Token Token 解析 userId,recipient_id 路由到个人

License → wsToken → WebSocket 建连(一期)

客户端启动
  ① POST /anesthesia/aiLicense/verify(或 ICU 对应接口)
     参数:license + clientMac          ← 现有客户端已有
  ② POST /alert/ws/token
     参数:license + mac + productLine  ← 预警平台新增,复用同一套 License 校验逻辑
     返回:wsToken(JWT,有效期 24h,含 license、mac、productLine、roomId)
  ③ WebSocket 建连
     wss://alert-host/alert/ws?token={wsToken}
     ❌ URL 中禁止裸传 license(安全)
  ④ 服务端
     校验 wsToken → 解析 roomId
     Session 注册到订阅组:{productLine}:room:{roomId} 或 {productLine}:ward:{wardCode}
  ⑤ 有预警时
     读 alert_record / alert_notice 中的 push_route_key
     向对应订阅组广播(带 seq)

REST 兜底接口同样携带 Authorization: Bearer {wsToken}(或 Header 传 license+mac),与 WS 共用鉴权。

roomId / deptCode / ward_code 映射(路由键对齐)

License 表(ai_license)有 roomIdcategory;采集库患者有 room_iddept_codeward_code推送路由以采集库患者位置为准,License 的 roomId 须与订阅维度对齐:

产品线 License 字段 采集库字段(患者位置) 推送路由键 push_route_key WS 订阅组(终端注册)
ANES(麻醉) roomId(手术室编号) room_id ANES:room:{room_id} ANES:room:{license.roomId}
ICU(重症) roomId(护士站/病房编号,绑定时约定) ward_codedept_code ICU:ward:{ward_code} ICU:ward:{license.roomId}

规则:

  • 落库时(写 alert_record / alert_notice):从采集库快照患者 room_id / ward_code / dept_code,计算并写入 push_route_key
  • 建连时:终端 License 绑定的 roomId 决定它订阅哪个组;须与上表约定一致(绑 OR03 的终端只收 OR03 的预警)。
  • dept_code 作为 REST 查询的辅助维度保留;WS 广播以 push_route_key 为准。

License roomId 与患者 room 对齐

License 的 roomId 必须和采集库患者 room_id / ward_code 同一套编码,否则终端订阅了却收不到,或收到别的 room 的预警。

环节 做法
License 绑定 绑定时校验 roomId 在科室/手术室配置表中存在(复用现有 AiLicense 绑定流程)
联调 与数据采集确认 ICU ward_code、ANES room_id 与 License roomId 对照表
运行期 预警落库时 push_route_key 来自采集库快照;WS 订阅组来自 License;两者编码不一致则推送不到

推送路由与注意事项

说明 方案
禁止全产品线广播 若向整个 ANES 广播,所有手术室都会收到无关预警 必须按 push_route_key 精准路由,只推给 {productLine}:room/ward:{xxx}
同房间多终端 多台设备绑定同一 roomId 全部收到相同预警——手术室大屏/护士站场景符合预期
已读 / 确认 一期无 userId POST .../read 请求体带 clientId(license 或 mac),落库 alert_notice.read_client_id不能追溯「哪个医生」,只能追溯「哪台终端」
处置确认 ack 同上 alert_record.acked_client_id + acked_time;二期 SSO 后改为 acked_by(userId)
WS 安全 license 是长期凭证 仅 HTTP 换短期 wsToken;WS / REST 只带 token,不传 license 明文
token 过期 wsToken 24h 过期 客户端定时刷新或 WS 断连时用 license+mac 重新换 token
患者转科/转床 产生预警后患者已离开 一期路由不变;旧 room 仍可见历史未读

患者转科/转床:push_route_key 业务规则

一期约定:历史预警以产生时刻的位置为准,push_route_key 落库后不再修改。

现象 是否接受 说明
患者转出 OR03 后,OR03 终端仍能看到转出前产生的未读预警 接受 快照 + 审计需要;医护可在旧终端已读/确认
转入 OR05 后,OR05 终端看不到转科前的历史预警 接受 新预警用新 room 的 push_route_key
历史预警跟随患者最新位置推送 一期不做 二期用 original_push_route_key + 位置快照做迁移

落库时除 push_route_key 外,额外写入(一期就存,二期改路由时用):

字段 说明
push_route_key 产生时刻的 WS/REST 路由,一期不变
original_push_route_key push_route_key 相同,永久保留
location_snapshot JSON:{room_id, ward_code, bed_no, dept_code} 产生时快照

新预警始终按当时采集库快照写路由;REST 未读按 push_route_key 过滤。

一期 alert_notice 落库字段:product_lineroom_idward_codedept_codepush_route_keyis_readread_client_id;REST 未读按 push_route_keyproduct_line + room_id/ward_code 过滤。alert_record 同时快照 patient_namebed_no、体征 JSON 等展示字段——推送 payload 全部来自本地表,不回查采集库(终端读不了采集库;采集库抖动不影响已生成预警的推送;患者转科后历史预警以产生时快照为准)。

5.0.8 线程池与连接池规划

组件 线程池 / 连接 默认 说明
XXL-Job 查库 Scheduler 线程 4 ICU/ANES 各 Job + 对账
AlertWorker @RabbitListener concurrency 5~10/产品线 与 AI Bulkhead 一致
Outbox Relay 单线程 Scheduled 1 1s 轮询
WebSocket 推送 独立线程池 core=4, max=8 不阻塞 Worker
HTTP 调 AI WebClient 连接池 max=20/产品线 复用连接
采集库只读 HikariCP max=10 独立数据源
业务库 HikariCP max=20 读写

5.0.9 监控体系

技术 用途
Micrometer 指标埋点(Spring Boot Actuator)
Prometheus 指标采集存储
Grafana 可视化大盘
指标 告警阈值(示例)
MQ 队列深度 > 100 告警
MQ DLQ 堆积 > 0 告警
AI 调用 P95 耗时 > 25s 告警
AI 超时率 > 5% 告警
AI 熔断状态 OPEN 告警
Outbox PENDING 积压 > 50 且持续 5min;单条 PENDING > 30s;单条 retry_count ≥ 10
Redis running 锁 alert:running:* 键数量、Lua 失败/超时次数、锁获取失败率
补跑占比 is_reanalysis=1 的 pipeline 占当日总量
活跃患者缓存 刷新 Job 耗时、失败次数、查库是否命中缓存
Watermark 延迟 采集库 MAX(record_time) - watermark > 15min
WebSocket 在线连接数 监控趋势
端到端延迟 P95 > 10min 告警

5.0.10 患者数据与采集库只读:推送为何不查采集库

研判阶段读采集库,推送阶段只读本地业务库——不做患者主数据全量同步。

为什么推送不能依赖采集库?

原因 说明
客户端读不了采集库 前端/终端只能连预警平台;姓名、床号、风险摘要等展示字段必须在推送 payload 里,不能指望终端再去查采集库
推送要稳定 采集库抖动、慢查、短暂不可用,不应影响已生成预警的 WebSocket 推送和 REST 兜底
审计要固定 预警产生时患者在哪张床、哪个 room/ward,应快照固化;不能事后回采集库再查(患者可能已转科/出院)

两阶段数据用法

┌─ 阶段 A:研判(Worker)────────────────────────────────────┐
│  读采集库(只读)→ 取体征 + 患者路由信息 → HTTP 调 AI        │
│  ❌ 此阶段不写采集库;watermark 在本地维护                   │
└────────────────────────────────────────────────────────────┘
                          ↓ AI 成功,事务2提交
┌─ 阶段 B:落库 + 推送(本地 alert_platform)─────────────────┐
│  alert_record 写入展示快照 + push_route_key                 │
│  alert_notice 写入 → WebSocket 广播 / REST 兜底             │
│  ✅ 此阶段不再查采集库;payload 全部来自本地表               │
└────────────────────────────────────────────────────────────┘

Worker 写 alert_record 时,把当时从采集库查到的展示字段一次性快照进去;之后 WS/REST 只读本地库。WS 按 push_route_key(如 ANES:room:OR03)向对应订阅组广播;客户端用 license+mac 换 wsToken 后建连,终端 License 绑定的 roomId 决定订阅哪个组。

本地存什么?(不是全量同步)

存储位置 存什么 用途 必须
alert_record patient_keypatient_namebed_noroom_idward_codedept_codepush_route_keyrisk_level、AI 摘要、vitals_snapshot(JSON) 推送 payload + 列表展示 + 审计
alert_notice product_lineroom_id/ward_codepush_route_keyseq(自增 id)、is_readread_client_id WS 路由 + REST 兜底
alert_patient_watermark 每患者 last_data_timepending_reanalysis 增量查采集库、延迟补跑
Redis 活跃患者缓存 patient_key → room/ward/bed,TTL 1~2min 减少每轮查采集库;不是持久化同步 建议
患者主数据全表 采集库仍是 source of truth
患者主数据不做全量同步到预警平台;只在产生预警时把当时需要的字段快照进 alert_record

采集库只读对「查数研判 / 推送」的影响

场景 影响 现有应对
只能 SELECT 不能写 watermark 到采集库 watermark 放本地 alert_patient_watermark
3 分钟轮询 新体征有延迟 可配 poll_interval;对账 Job 补漏
采集库短暂不可用 本轮查不到新数据,无法产生新预警 对账 + 人工补偿;已落库的预警照常 WS/REST 推送
AI 不可用 无法研判新数据 任务积压 MQ;已有 alert_notice 照常推
患者转科 实时位置变了 历史预警以产生时快照为准;新预警用新位置;不跟随最新 room 改 push_route_key

什么时候才需要考虑「同步」?

一期通常不需要。仅以下情况才值得加本地表或 CDC:

条件 方案 阶段
采集库 QPS 限得很死,活跃患者列表都查不动 轻量 alert_patient_cache 表,Job 定时刷新 按需
复杂患者检索/统计,且不能每次打采集库 CDC 同步患者维度表(Canal/Debezium) 二期
采集库与预警平台网络隔离,Worker 根本连不上 消息/文件同步(非当前场景)

当前约束是可只读查询采集库,走「定时增量查 + 快照落库 + 本地推送」即可。


六、四个硬问题怎么解决

6.1 轮询的性能和延迟

活跃患者怎么查

「只查在科/术中患者」如果每轮对采集库做 WHERE status='在科' 全表扫描,容易成为热点。做法:

做法
活跃列表缓存 Redis alert:active:{productLine},存 patient_key 列表及 room/ward/bed,TTL 60~90s
刷新 Job 独立 XXL-Job 每 60~90s 增量刷 Redis(见下)
查体征 SQL patient_key IN (活跃列表) AND record_time > 水位,禁止无 patient_key 的大范围扫
查库 Job 读 Redis 活跃列表 + 按患者抢 fetch 锁后查增量体征

刷新 Job 不宜每轮全表扫。 SQL 示例:

SELECT patient_key, room_id, ward_code, bed_no, dept_code
FROM   ... 
WHERE  status IN ('在科','术中')
  AND  last_update_time > :lastRefreshTime   -- 增量;首轮或定时全量

有入科/出科时间字段时,用「当前在科」视图或 admit_time / discharge_time 过滤。刷新失败:告警 + 继续用上一版 Redis 缓存(TTL 未到前查库 Job 照常跑);连续失败 N 次升级告警。

性能手段

轮询查库的性能和延迟,主要靠增量水位、活跃列表缓存、分批查询和独立连接池:

手段 做法 量化效果
增量水位 每患者记上次处理时间点,只查 record_time > 水位,走 (patient_id, record_time) 联合索引 命中行数=一个窗口的体征条数(几十行),单次 <10ms
只查活跃患者 列表来自 Redis alert:active:*,SQL 带 patient_key IN (...) 候选约 200 人,避免扫全库
缓存活跃列表 独立 Job 每 60~90s 刷新 Redis 查库 Job 不每轮打采集库要名单
分批 + 独立连接池 每批 ~50 人一条 SQL,采集库独立连接池(max 10) 每轮约 200/50 = 4 次批量查询;不与业务库抢连接
查库与 AI 解耦 查库线程只入队,不等 AI 单轮查库 数百 ms~数秒完成,不被 5~30s 的 AI 拖垮

单轮查库耗时:4 批 ×(约 10~50ms)≈ 1 秒内,小于 180s 周期。

端到端延迟(约 1.5~5 分钟):

环节 耗时 是否瓶颈
等下一轮查库 0~180s(平均 90s) 周期决定,可调小
查库 + 落库 <1s
MQ 排队 + 消费 秒级(取决于 AI 并发)
AI 研判 5~30s
WebSocket 推送 <1s(内网)

瓶颈在查库周期和 AI 耗时。要缩短等待时间可调 poll_interval_seconds(需评估采集库压力);AI 慢则靠队列排队,查库线程不被拖住。

6.2 数据可靠性

常见故障(MQ 抖动、WS 断线、AI 超时、进程重启)下,研判任务和预警记录应能恢复、不静默丢失;通知最终到达终端,允许秒到分钟级延迟。

以下依赖基础设施或运维,不在本期应用层单独解决:

情况 说明
业务库整库不可用 依赖 DB 备份与恢复
采集库长期不可用 无法产生新预警;已落库记录仍可推送
终端长期离线且不调 REST 未读留在 alert_notice,上线后拉取
对账周期内的漏判 15~30 分钟内由对账 Job 或人工补偿补跑

各段分开容错,MQ 允许重复投递(幂等去重),按最终一致设计。

(1)查库 → 入队:Outbox(本地事务 + 异步投递)

最容易丢的地方:业务库事务提交了,但发往 MQ 的消息没发出去(或反过来)。我们不用 Seata/XA 这种重量级跨库事务,用 Outbox(发件箱)模式,本质是把"要发的消息"和"业务数据"放进同一个本地事务:

① 写库阶段(一个本地事务,要么全成功要么全回滚)
   BEGIN
     写 alert_pipeline      (流水线)
     写 alert_pipeline_log  (日志)
     写 alert_outbox        (待发消息, 状态=PENDING)
   COMMIT

② 发送阶段(独立线程,约 1s 轮询)
   取 PENDING → 发 RabbitMQ → **等 Publisher Confirm**
   Confirm 成功 → 标 PUBLISHED
   Confirm 失败/超时 → 保持 PENDING,下轮重发
   补偿 Job:PENDING 超过 30s 重发;retry_count ≥ 10 标 FAILED

③ 消费阶段(幂等)
   Worker 用 idempotency_key 去重 → 调 AI、落库 → 成功才确认(ACK)
   失败 basicNack → TTL 重试队列(退避)→ 超 3~5 次进死信

这样即使 MQ 临时挂了,待发任务仍留在 alert_outbox,恢复后补发。Outbox 发完标记 PUBLISHED,可定时清理,不会无限膨胀

不用 XA/Seata:业务库内本地事务 + Outbox 最终一致,够用且更简单。

幂等:MQ 可能重复投递,idempotency_key 唯一索引保证同一研判作业只处理一次。

(2)RabbitMQ:生产 / 存储 / 消费三段

生产者 ──▶ Broker ──▶ 消费者
  ①         ②          ③
风险 措施
① 生产者→Broker 发了没收到 / 路由不到 Publisher Confirm(broker 确认才算成功)+ Outbox 重发
② Broker 自身 重启就没了 交换机/队列 durable + 消息持久化(落盘);上量后用 Quorum 多副本队列
③ Broker→消费者 处理没完就丢 手动 ACK(业务成功才确认)+ 失败进重试队列 + 超次进死信队列

生产侧 Outbox + Confirm,Broker 持久化,消费侧手动 ACK、重试和死信,一段失败由下一段承接。

(3)推送到终端:先落库,再推送,断了也能捞

WebSocket 管实时,可靠性靠先写 alert_notice、再 WS 推送、seq 补发和 REST 未读拉取。连接会断,设计上按断线常态处理。

层级 机制 说明
L1 落库 推送前先写 alert_notice 推失败也不影响记录已在库
L2 实时 WS 按 push_route_key 广播 内网通常 <1s
L3 补发 重连带 lastSeq,补 id > seq 断线期间的增量补
L4 兜底 GET /alert/notices/unread 不依赖 WS 状态,终端主动拉
L5 对账 定时比对采集库 vs 水位 漏判自动补跑;仍失败走人工补偿

心跳 30s;重连退避 1s→…→30s;服务端 60s 无心跳回收连接。

REST 未读(最后一层)

接口 作用
GET /alert/notices/unread?productLine=ANES&roomId=OR03 拉本 room 全部未读
GET /alert/notices?sinceId={seq} 增量拉(Header 带 wsToken)
POST /alert/notices/{id}/read 标记已读(Body 带 clientId

已读 / 确认:前端回调落库;一期 clientId=license/mac(终端级),二期 SSO 后改 userId。HIGH 级可要求必须 ack(二期)。

(4)对账与人工补偿

XXL-Job 每 15~30 分钟:采集库某患者最新数据时间 vs 本地水位,有漏判则自动补跑。仍搞不定的进死信,由人工补偿处理台介入。

6.3 出问题能查:全程可追溯

每条预警就是一条**流水线(Pipeline)**记录,配一张阶段日志表,把"查数→调 AI→推送"每一步都记下来。

  • alert_pipeline:每条预警一行,记当前状态、风险级别、重试次数、错误。
  • alert_pipeline_log:每个阶段一行,记阶段、耗时、请求/响应摘要、错误原因。

状态是细粒度的(不是只有"成功/失败"):

已创建 → 已查库 → 已入队 → 消费中 → 已提交AI → AI完成
      → 预警已生成 → 推送中 → 已推送 → 完成
任意一步失败 → 等待重试 →(超次)→ 死信,等对账或人工补跑

排查示例——P001 未收到预警:查 alert_pipeline 状态为 AI_TIMEOUT → 查 alert_pipeline_log 见 AI 阶段 30s 读超时 → 任务在重试队列,等待或人工补跑。

6.4 实时看 + 能统计

实时监控大盘(管理端,Micrometer + Prometheus + Grafana):

面板 看什么 告警阈值(示例)
流水线 各阶段排队 / 进行中 / 今日完成
队列 RabbitMQ 深度、DLQ、reanalysis 队列 DLQ >0
Redis running 锁数量、Lua 失败率、fetch 锁失败率 异常抬升
补跑 is_reanalysis pipeline 占比 长期 >20% 需排查
活跃缓存 刷新耗时、失败次数 连续失败告警
AI 调用次数、成功率、P95 耗时、超时率 P95 >25s;超时率 >5%;熔断 OPEN
推送 推送成功率、重试堆积
Outbox PENDING 积压 >50 且持续 5min
水位 采集库 MAX(record_time) − watermark 延迟 >15min
WebSocket 在线连接数 监控趋势
延迟 端到端 P50 / P95 P95 >10min

统计报表:按时间、风险级别、产品线、病区等维度统计预警量、AI 成功率等。实时查询走预警表聚合,历史报表用每日汇总表,避免大范围实时聚合压库。

6.5 人工补偿处理台

自动重试、对账仍无法处理的异常任务(死信、长期 AI 超时、对账漏判),由运维/管理员在人工补偿处理台介入:

死信 / 对账告警
      ↓
管理端「补偿处理台」列出异常 Pipeline
      ↓
操作员查看 pipeline_log → 确认原因
      ↓
┌─ 补跑 AI ─────────┐  ┌─ 跳过并标记 ────┐  ┌─ 强制关闭 ─────┐
│ 重新入队(新       │  │ 人工确认无需     │  │ 记录原因,      │
│ idempotency_key)  │  │ 预警,更新水位   │  │ 不再自动重试    │
└───────────────────┘  └─────────────────┘  └────────────────┘
      ↓
操作记入 alert_pipeline_log(操作人、时间、原因)
能力 说明
异常列表 按状态(DLQ、AI_TIMEOUT、对账漏判)、patient_key时间范围、productLine 过滤
单条补跑 指定患者 + 时间窗,生成新 Pipeline 重新入队
批量补跑 对账 Job 漏判一键补跑(二期)
操作审计 人工操作写 pipeline_log
人工补偿用于自动重试和对账仍无法恢复的任务;一期先做异常列表和单条补跑,批量与跳过放二期。

七、消息怎么收发(同步/异步、RabbitMQ、多语言订阅)

7.1 哪些是同步、哪些是异步

写本地库走同步,调 AI 和推送走异步:

环节 同步/异步 说明
查采集库 + 写流水线/日志/Outbox 同步 调度线程内一个本地事务完成
投递 RabbitMQ 异步 查库线程只入队,不等 AI
消费任务 + 调 AI 异步 AI 5~30 秒,不能阻塞查库
写预警/通知 同步 Worker 事务内落库
WebSocket 推送 异步 推送失败有重试和 REST 补拉
前端拉未读 同步 HTTP 请求

7.2 RabbitMQ 通讯模型

RabbitMQ 只在平台内部传研判任务:查库写入任务消息,Worker 消费后调 AI,不对前端开放

作用 说明
解耦 查库(快,秒级)和调 AI(慢,5~30s)分开,查库线程入队即返回,不被 AI 拖垮
削峰 每轮查库可能一下产生几十条任务,先进队列排队,Worker 按并发 5 平稳消费,不冲垮 AI
分线隔离 ICU / ANES 各自队列,重症 AI 慢不影响麻醉,互不抢占
可靠不丢 持久化 + Publisher Confirm + 手动 ACK + 重试/死信,配合 Outbox(写 pipeline + outbox 同一本地事务,Relay 轮询发 MQ,失败重发)保证任务不丢失
注意:RabbitMQ 传内部研判任务,医护端走 WebSocket + REST。

拓扑:

                 ┌──────────────────────────────┐
  Outbox 投递 ─▶ │ Exchange: alert.task (direct) │
                 └───────┬───────────────┬───────┘
              routing=ICU│               │routing=ANES
                         ▼               ▼
              alert.task.icu       alert.task.anes      ← 重症/麻醉分队列,互不影响
                         │               │
                   Worker(ICU)      Worker(ANES)        ← 各自按并发消费,调各自 AI
                         │
              失败 ─▶ alert.retry.*(TTL 延迟重试)─▶ 回主队列
              超次 ─▶ alert.dlq.*(死信,对账/人工补跑)

  AI 完成且 pending=true ─▶ alert.reanalysis.delay(TTL 45s,is_reanalysis=true)─▶ alert.task.*
机制 配置 作用
消息体 JSON:患者、时间窗、产品线、流水线 id、幂等键 一条 = 一次研判作业
可靠投递 Publisher Confirm broker 确认收到才算发成功
持久化 交换机/队列 durable + 消息落盘 重启不丢
消费确认 手动 ACK 业务成功才确认,崩溃自动重投
限流 prefetch(与 AI 并发一致) 防止积压
重试/死信 TTL 重试队列 + DLQ 失败退避、永久失败留存
扩展 加 routing key + 队列 + Worker 新增急诊等产品线不动老队列

7.3 前端 / 不同语言怎么订阅预警

医护端不直连 RabbitMQ,统一走 WebSocket 和 REST。

一期(License 鉴权,按 room/ward 广播):

① 设备鉴权(现有)
   POST /anesthesia/aiLicense/verify?license=xxx&clientMac=yy:yy:...
   (ICU 客户端走 /icu/aiLicense/verify,路径与现有项目一致)

② 换取 WS Token(预警平台新增)
   POST /alert/ws/token
   Body: { "license": "xxx", "mac": "yy:yy:...", "productLine": "ANES" }
   → 校验逻辑复用 AiLicenseService(license 有效 + mac 匹配 + 已绑定)
   → 返回 { "wsToken": "eyJ...", "roomId": "OR03", "expiresIn": 86400 }

③ WebSocket 建连
   wss://alert-host/alert/ws?token={wsToken}
   服务端:校验 token → Session 注册到 ANES:room:OR03(或 ICU:ward:W01)

④ 接收推送
   服务端按 alert_notice.push_route_key 向对应组广播(JSON + seq)
   客户端只处理 productLine 匹配的 message

⑤ 断线重连
   带 lastSeq → 服务端补发 id > lastSeq;token 过期则重新执行 ②

⑥ REST 兜底(Header: Authorization: Bearer {wsToken})
   GET  /alert/notices/unread?productLine=ANES&roomId=OR03
   POST /alert/notices/{id}/read   Body: { "clientId": "license或mac" }
   POST /alert/records/{id}/ack    Body: { "clientId": "license或mac" }

推送路由示例:

事件 push_route_key 谁收到
患者 P001 在 OR03 产生 HIGH 预警 ANES:room:OR03 仅订阅 ANES:room:OR03 的终端
患者 P002 在 W01 病区产生 MEDIUM 预警 ICU:ward:W01 仅订阅 ICU:ward:W01 的终端
❌ 错误做法 ANES 全产品线广播 所有手术室都收到,必须禁止

二期(接入 SSO,按用户精准推送):

1. 登录拿 Token(Sa-Token/JWT)
2. 建 WebSocket:wss://主机/alert/ws?token=...&productLine=ICU
3. 服务端按 用户 + 产品线 推送;REST 按 recipient_id 过滤

任何能发 HTTP / 连 WebSocket 的语言都能接,不用引 MQ 客户端:

客户端 怎么接
Vue / React 浏览器原生 WebSocket
安卓 / iOS OkHttp WebSocket / URLSession
Python websockets + requests
C# / Java ClientWebSocket / 标准 WS 客户端

消息带 productLine,客户端只处理自己产品线的,重症/麻醉互不串。

二期若有外部系统(如急诊)要订阅"预警已产生"事件,再单独开放 HTTP Webhook 或事件 Exchange,不影响医护端这套。

八、不确定的事,全做成配置

目前几个还没定的点,不写死在代码里,全部做成配置表 alert_config,业务部门讨论定了之后改配置即可,不用改代码、不用重新发版。

配置项 默认 对应你不确定的事
window_minutes 30 每个患者取多长时间范围的数据
poll_interval_seconds 180 多久查一次库
ai_max_concurrency 5 AI 并发能力(AI 强就调大)
ai_timeout_seconds 30 AI 单次超时
push_targets 推给哪些角色/病区/前端
risk_threshold_push 中风险 达到什么级别才推

每条配置按产品线(ICU/ANES)独立。AI 处理能力不确定也不怕:任务先进队列排队,AI 多慢都只是排队,不会丢、不会压垮 AI;能力摸清后调并发即可。


九、核心数据表与索引

业务库 alert_platform,核心就这几张。采集库(只读)存体征与患者主数据;业务库存预警、通知、水位与快照——研判读前者,推送只读后者,不做患者主数据全量同步。

作用
alert_config 各产品线配置(窗口、周期、AI、推送规则)
alert_patient_watermark 每个患者的处理水位(增量查询、防漏判)
alert_pipeline 流水线主记录(含 is_reanalysis、状态、幂等键)
alert_pipeline_log 流水线阶段日志(排查用)
alert_record 预警记录 + 患者/体征快照(推送 payload 来源,不回头查采集库)
alert_notice 终端通知(push_route_key 路由、未读、seq)
alert_outbox 发件箱(retry_count,超 10 次标 FAILED)
alert_push_retry 推送失败重试
alert_stats_daily 每日统计汇总

主要字段(快照与推送)

alert_record — Worker 在 AI 成功时一次性写入,后续 WS/REST 只读此表组装 payload:

字段 来源 用途
patient_key 采集库 患者唯一标识
patient_name 采集库快照 列表/弹窗展示
bed_no 采集库快照 床号展示
room_id / ward_code / dept_code 采集库快照 位置展示 + 计算 push_route_key
push_route_key 计算 WS/REST 路由(产生时刻)
original_push_route_key 同 push_route_key 永久保留,二期改跟随路由时用
location_snapshot 采集库快照 JSON room/ward/bed/dept 产生时位置
risk_level AI 返回 风险级别
ai_summary AI 返回 研判摘要
vitals_snapshot 采集库窗口内体征 JSON 详情页/审计
pipeline_id 本地 关联流水线
acked_client_id / acked_time 终端回调 处置确认(一期终端级)

alert_notice — 推送前先落库,保证「不丢」:

字段 用途
id(自增) 推送序号 seq
record_id 关联 alert_record
product_line 产品线过滤
push_route_key WS 广播 + REST 未读
original_push_route_key 与 record 一致,审计用
room_id / ward_code 辅助查询
is_read / read_client_id / read_time 已读状态(一期 clientId=license/mac)
status delivered / pending 等推送状态

alert_patient_watermark — 仅用于增量查采集库,与推送无关:

字段 用途
product_line + patient_key 唯一键
last_data_time 增量查询起点;仅在 AI 成功 + alert_record 落库后同事务更新
pending_reanalysis running 期间有新体征标 true;Lua 完成后经延迟 MQ 补跑

Redis(非表,建议)alert:active:{productLine}patient_key→room/ward/bed,TTL 1~2min,减少采集库压力;不落库、不同步

索引设计

索引按实际查询路径设计,避免全表扫:

索引 用途
alert_pipeline idx_status (status) 监控大盘按状态统计
idx_create_time (create_time) 按时间范围查流水线
idx_patient_time (product_line, patient_key, create_time, is_reanalysis) 补跑去重、人工台按患者+时间查
uk_idempotency (idempotency_key) 幂等去重(唯一)
alert_pipeline_log idx_pipeline (pipeline_id) 排查某条流水线各阶段
alert_record idx_patient (product_line, patient_key, create_time) 患者预警历史
idx_risk_time (risk_level, create_time) 按风险级别统计
alert_notice idx_route_unread (push_route_key, is_read, id) REST 未读 + WS 路由(一期)
idx_recipient (recipient_id, is_read, id) 二期 SSO 按用户未读
alert_outbox idx_pending (status, create_time) Outbox 轮询 PENDING
alert_patient_watermark uk_patient (product_line, patient_key) 水位读写(唯一)
alert_push_retry idx_status_time (status, next_retry_time) 推送重试调度
建表 SQL 联调时再定,这里只列职责和字段意图。

十、部署与容量

部署:独立 jar / Docker 一个服务(REST + WebSocket + 内嵌调度),加 MySQL、Redis、RabbitMQ。采集库用只读账号、独立数据源。

alert-platform.jar  (8080: REST + WebSocket)
MySQL alert_platform  业务库
MySQL icu-read / anes-read  采集库(只读)
Redis / RabbitMQ / XXL-Job

容量推算(200 患者、180s 周期、每轮 30% 有新数据):

指标 推算 结果
每轮入队任务 200 × 30% 60 条
平均入队速率 60 / 180s ~0.33 条/秒
日任务量 60 × (86400/180) ~2.88 万条/天
日新增流水线行 ≈ 日任务量 ~2.88 万行/天
日志行(每任务 ~5 阶段) 2.88 万 × 5 ~14 万行/天
AI 处理能力 每线并发 5 × (60s/单次20s) ~15 条/分钟/线,> 入队速率

按上面推算:

  • AI 处理(~15 条/分钟/线)高于平均入队(双线约 20 条/分钟),队列可吸收突发;
  • 日增约 2.88 万行业务数据 + 14 万行日志,三年内单表可不分,日志按月归档;
  • 一期 1~2 个实例即可。

1000 患者时的扩展

1000 患者 × 30% × 480 轮/天 ≈ 14.4 万条/天(5 倍于 200 人)
平均入队 ~1.7 条/秒,日峰 ~5 条/秒 —— 仍远低于 RabbitMQ 能力
瓶颈仍在 AI:单线并发 5、单次 20s → ~15 条/分钟/线
扩展手段 做法 是否改业务代码
加 Worker 实例 多实例竞争消费同一队列 ❌ 不改
调 AI 并发 ai_max_concurrency 配置调大(受 AI 服务上限约束) ❌ 不改
加产品线队列 新 routing key + 队列 + Worker ❌ 不改
分库分表 日增 >50 万行时再考虑 远期

患者从 200 扩到 1000:加 Worker 实例、调 ai_max_concurrency 即可,RabbitMQ 竞争消费分摊负载,业务代码不用改。

AI 熔断期间 Worker 暂停调 AI,查库和入队照常,任务留在 MQ;AI 恢复后继续消费。期间无法产生新预警,已写入 alert_notice 的仍可推送。


十一、实施计划

阶段 目标 交付
一期 MVP 跑通主流程 双线查库 → 调 AI → 推送(WS 科室广播 + REST 兜底)→ 预警列表 + 基础日志
二期 可靠性 生产可用 Outbox、重试/熔断、对账、监控大盘、人工补偿处理台
三期 增强 体验完善 SSO 按用户推送、预警确认闭环、更多产品线接入

需要各方确认的:

要提供
数据采集 只读库账号、体征表结构、在科/术中判定 SQLroom_id/ward_code 与 License roomId 对照
AI 团队 各线 API 地址、鉴权、请求/响应格式、requestId 幂等GET /result/{requestId} 结果查询、处理能力上限
业务/产品 时间窗口、推送科室范围、风险级别定义、是否需医护确认
前端 复用 AiLicense 鉴权;对接 /alert/ws/token、WS 建连、已读/确认带 clientId