患者 AI 消息预警平台 — 技术方案
· v1.0 · 2026-06-30
把「采集库里的患者体征 → 丢给 AI 研判 → 把风险预警实时推给医护」这条链路,做成一个独立、可靠、可追溯的后台平台。支持多条产品线,一期接入**重症(ICU)和麻醉(ANES)**两条产品线。
文中关键参数(查库周期、AI 并发/超时、容量等)附默认值和测算过程。
一、为什么单独建一个项目
体征数据已经由采集系统写进了数据库。我们要做的是中间这段:定时取数 → 调 AI → 拿结果 → 推送前端,并保证全程不丢、可查、可统计。
平台不做两件事:不做体征采集入库(采集系统已有),不做 AI 模型(AI 团队提供 HTTP 接口)。
采集库由其他部门维护,我们只有只读权限,不能建表、写数据。研判时读采集库取体征;推送时只读本地业务库——Worker 写 alert_record 时把姓名、床号、room/ward 等字段快照进去,WebSocket/REST 不再查采集库。患者主数据不做全量同步。为什么单独建项目,而不是塞进现有系统:
| 理由 | 说明 |
|---|---|
| 职责干净 | 只负责「预警编排 + 推送」,边界清晰 |
| 故障隔离 | 定时查库、调慢 AI、推送都在独立进程,不拖累业务系统 |
| 独立伸缩 | 患者多、AI 慢时,单独给本平台加实例即可 |
| 松耦合 | 和采集库(只读)、AI(HTTP)、前端(WebSocket/REST)对接,各方独立升级、互不改代码 |
| 可复用 | 重症、麻醉共用一套,后续急诊等产品线按同模式接入 |
二、整体架构:平台在链路里的角色
整个平台夹在采集域和 AI/前端之间,只负责中间的编排和推送。
采集域(别人的) 本平台(我们做的) 外部能力(别人的)
┌──────────────┐ ①只读查库 ┌──────────────────────────┐
│ 采集系统 │ ───────────▶ │ ②查库 → ③编排 → ④调AI │ ④HTTP ┌──────────┐
│ 写入体征 │ │ → ⑤落库 → ⑥推送 │ ───────▶ │ AI 服务 │
└──────┬───────┘ │ │ ◀─────── │ (重症/麻醉)│
│ 写入 │ 全程写库:pipeline/日志/ │ 返回 └──────────┘
▼ │ 预警/通知 → 可追溯 │
┌──────────────┐ 平台主动查 │ │ ⑥WS+REST ┌──────────┐
│ MySQL 采集库 │ ◀────────── │ │ ───────▶ │ 业务前端 │
│ (ICU/ANES) │ └──────────────────────────┘ │ 弹窗/列表 │
└──────────────┘ └──────────┘
前端通过 WebSocket(实时)+ REST(兜底拉取)拿预警。
平台内部分五个环节,状态都落库,便于排查和补数据:
| 环节 | 干什么 | 关键技术 |
|---|---|---|
| ②查库取数 | 定时按水位增量查活跃患者的新体征 | XXL-Job + MyBatis-Plus + 只读多数据源 |
| ③任务编排 | 把研判任务可靠地放进队列 | Outbox + RabbitMQ |
| ④AI 研判 | 取一段时间体征调 AI,拿风险结果 | Worker 消费 + WebClient |
| ⑤预警落库 | 写预警记录、通知记录 | MySQL(本地事务) |
| ⑥消息推送 | 实时推医护,断线可补 | WebSocket + REST |
三、一条预警是怎么跑完的
1. 采集系统把体征写进采集库 (平台不参与)
2. XXL-Job 定时触发(默认每 3 分钟,可配)
3. 查库:窗口 = [last_watermark, now](上限 window_minutes),只查新增体征
—— 若该患者 AI 正在跑(running 锁)→ 跳过入队,标记 pending_reanalysis
—— 否则同一事务写 pipeline + 日志 + outbox 并提交(❌ 不更新水位)
4. 投递:Outbox 把任务发到 RabbitMQ(ICU / ANES 各自的队列)
5. 研判:Worker 取任务,汇总体征,HTTP 调 AI(带 requestId=pipelineId,5~30 秒)
6. 落库:AI 成功 → 同事务写 record + notice + 更新水位 + pipeline 完成
7. 推送:WebSocket 按 push_route_key(room/ward)广播;REST + wsToken 兜底
8. 兜底:AI 失败水位不变自动重试 → 仍失败进死信 → 对账/人工补偿补跑
—— running 锁释放后若 pending_reanalysis=true → 立即补一次分析
整条链路里,唯一的"慢"在 AI(5~30 秒),查库和推送都是毫秒~秒级。
四、技术选型(为什么用它们)
| 技术 | 用途 | 选型理由 | 量化依据 |
|---|---|---|---|
| Spring Boot 3 | 平台主体 | Java 栈成熟,独立后台首选 | 4 个模块(common/entity/biz/server),单体 1~2 实例即可 |
| MySQL 8 | 业务库 + 只读采集库 | 采集库本就是 MySQL,统一运维 | 业务库 9 张表,日增流水线约 2.8 万行,单表千万级前无需分库 |
| MyBatis-Plus | 数据访问 | 多数据源 + 增量 SQL 完全可控 | 单患者增量查询走联合索引,命中行数 = 一个窗口内体征条数(几十行),单次 <10ms |
| RabbitMQ 3.12+ | 内部任务队列 | 任务队列语义 + 原生重试死信(见下方测算) | 峰值入队 ~1 条/秒,远低于单节点万级 TPS 能力 |
| Redis 7 | 缓存 / 防抖 / 锁 | 防重复入队、缓存活跃患者 | 键约 200 个,QPS < 50 |
| XXL-Job | 定时调度 | 可视化调度 + 失败告警 | 查库 Job 周期 180s、对账 Job 900~1800s,各产品线独立 |
| WebClient | 调 AI | 响应式 HTTP,超时/并发/熔断可控 | 每产品线并发上限 5、读超时 30s(可配) |
| WebSocket | 实时推送 | 服务端主动推,毫秒级 | 内网推送 <1s,对比 5s 轮询最坏延迟降低约 80% |
| Sa-Token / JWT | 鉴权 | 独立鉴权,可对接 SSO | 一期 WS 用 License→wsToken;二期 SSO 按用户推送 |
为什么是 RabbitMQ(带容量测算)
一条消息对应一个患者、一个时间窗、一次 AI 研判,属于任务队列,不是日志流。量级如下:
活跃患者 ICU 100 + ANES 100 = 200 人
查库周期 poll_interval = 180s → 每天 480 轮
每轮有新数据占比 经验取 30% → 每轮入队 200 × 30% = 60 条
─────────────────────────────────────────────────────────
平均入队速率 60 条 / 180s ≈ 0.33 条/秒
日峰(×3 突发) ≈ 1 条/秒
日任务总量 60 × 480 ≈ 2.88 万条/天
真正瓶颈 AI 单次 5~30s,并发 5 → 单线约 10~60 条/分钟
瓶颈在 AI 吞吐(每分钟几十条),不在 MQ。日入队约 1 条/秒,主流 MQ 都够用;选型主要看任务语义、重试死信、分线路由和运维成本。
| 维度 | 我们的需求 | RabbitMQ ✅ | Kafka | RocketMQ | Redis Streams |
|---|---|---|---|---|---|
| 消息语义 | 一条任务一个 Worker | 竞争消费,语义直接 | 分区日志流,偏流处理 | 队列+Topic,可用 | 消费者组,可用 |
| 吞吐需求 | ~1 条/秒 | 万级 TPS | 百万级 | 十万级 | 万级 |
| 重试/死信 | 强需求(AI 偶发失败) | TTL + DLX 原生支持 | 需自研补偿 | 延迟消息+DLQ | 无原生 DLQ,要自研 |
| 分线路由 | ICU/ANES 隔离 | direct exchange,一行配置 | 靠分区规划 | Tag 过滤 | 单 Stream,隔离弱 |
| 可靠投递 | Outbox+Confirm+ACK | Publisher Confirm 成熟 | acks 成熟 | 事务消息 | 无 Confirm,弱 |
| 运维成本 | 内网、小团队 | 单节点可跑,管理台友好 | 依赖 ZK/KRaft,偏重 | NameServer,中等 | 复用现有 Redis,最轻 |
为什么不是 Kafka:我们用不上 Kafka 的百万 TPS 和回放能力,却要维护 ZK/KRaft;任务队列所需的「一条一消费 + 死信重试」在 Kafka 上要自己实现。
为什么不是 Redis Streams:缺原生 DLQ 和 Publisher Confirm,可靠投递要补较多逻辑。
选 RabbitMQ:任务语义清晰,重试/死信/分线路由现成,Spring AMQP 集成成熟,单节点能撑一期。
为什么不用线程池代替 MQ:日任务约 2.8 万条,线程池也能跑。选 MQ 主要是因为失败重试、死信、多 Worker 扩缩和任务持久化,而不是 TPS:
| 能力 | 线程池 | RabbitMQ |
|---|---|---|
| 失败重试 / 死信 | 需自研 | 原生 TTL + DLX |
| 多 Worker 水平扩展 | 难协调 | 竞争消费,加实例即可 |
| 查库与 AI 解耦 | 同进程耦合 | 进程/实例可独立扩缩 |
| 任务状态持久化 | 内存丢失 | 持久化队列 + Outbox |
| 积压可观测 | 无 | 管理台看队列深度 |
Redis 用途(缓存 / 防抖 / 锁)
Redis 在平台做四件事,(任务队列是 RabbitMQ):
| 用途 | 怎么做 | 解决什么 |
|---|---|---|
| 活跃患者缓存 | alert:active:{产品线},TTL 1~2 分钟 |
少查采集库 |
| 患者防抖 | SET alert:debounce:{患者键} 1 NX EX 30,已存在就跳过本轮入队 |
同患者 30s 内不重复入队 |
| 入队分布式锁 | SET alert:lock:{幂等键} {实例号} NX EX 60 |
多实例不重复建 Pipeline |
| 查库段锁 | SET alert:fetch:{产品线}:{patientKey} {实例号} NX EX 10 |
保护「读水位 → 判是否入队 → 写 outbox / 改 pending」 |
| AI 运行态锁 | SET alert:running:{productLine}:{patientKey} 1 NX EX 300 |
同一患者同一时刻只有一个 AI 任务在跑 |
解锁用 Lua 比对 value 后 DEL;DB 上 idempotency_key 唯一索引再挡一层重复。五、核心机制详解
5.0 防 AI 重复分析:运行态锁 + 延迟补跑
AI 分析进行中(如 10:00 开始,耗时 30s),若 10:06 查库又来新体征,会再入队,导致同一患者被两个 Worker 同时跑。处理办法:
10:00 Worker 开始分析 P001 → SET alert:running:ICU:P001 = 1 (TTL 300s)
10:05 采集库又来新体征
10:06 查库 Job 扫描到 P001 有新数据
→ 检查 alert:running:ICU:P001 存在 → 跳过入队,不写 outbox
→ 标记 alert_patient_watermark.pending_reanalysis = true(有新数据待补)
10:00:30 AI1 完成 → EVAL Lua(DEL running + 读 pending)→ 若 pending 则发延迟补跑(TTL 45s,is_reanalysis=true)
10:01:15 补跑 Worker 消费 → 新建 pipeline + outbox → 清除 pending
| 机制 | 作用 |
|---|---|
alert:running:{patientKey} Redis 锁 |
同一患者同一时刻最多 1 个 AI 任务 |
pending_reanalysis 标记 |
AI 跑着时新来的数据不丢,完成后经延迟消息补跑 |
idempotency_key 唯一索引 |
重复 outbox / 重复消息只处理一次 |
debounce 30s |
减少无效入队 |
多实例并发(查库 Job + Worker 同时跑)
部署多实例时,查库 Job 和 Worker 可能同时改 alert_patient_watermark 和 Redis,需要额外约束:
| 竞态 | 风险 | 处理 |
|---|---|---|
| 多个查库实例同时发现同一患者有新数据 | 可能各写一条 outbox(幂等键不同则 pipeline 重复) | 每患者抢 alert:fetch:{产品线}:{patientKey}(EX 10s),持锁才执行「读水位 → 判断 → 写 outbox」;idempotency_key 唯一索引兜底 |
running 期间写 pending_reanalysis |
与 Worker 完成时的清除并发,可能漏标或重复补 | 标 pending 与查库判断放在同一 fetch 锁内;或 watermark 行 SELECT … FOR UPDATE |
| AI 完成:删 running + 查 pending + 补跑 | 非原子会漏补或重复补 | Redis EVAL Lua;pending=true 时发延迟 MQ(TTL 45s,is_reanalysis=true),Worker 异步消费 |
| 补跑消息与正常查库 Job 重叠 | 重复 pipeline、无效日志 | is_reanalysis 标记 + 45s 内查库跳过 + debounce(见「补跑与查库去重」) |
-- fetch 阶段(持 alert:fetch 锁)
IF running 存在 THEN
UPDATE watermark SET pending_reanalysis=1
跳过 outbox
ELSE
IF 该患者 45s 内已有 is_reanalysis pipeline 在处理/刚完成 THEN 跳过
写 pipeline + outbox
END
-- AI 完成(Redis EVAL Lua,见下)
Lua 完成脚本与 fallback
Lua 通过 EVAL 执行,脚本内不抛 Java 异常;返回固定字符串供 Worker 判断:
| 返回值 | 含义 |
|---|---|
success |
running 已删,无 pending |
pending_cleared |
running 已删,pending 已清,已触发延迟补跑 |
no_pending |
running 已删,pending 本来就是 0 |
-- KEYS[1]=running KEYS[2]=pending ARGV[1]=pipelineId
local v = redis.call('GET', KEYS[1])
if v then redis.call('DEL', KEYS[1]) end
local p = redis.call('GET', KEYS[2])
if p == '1' then
redis.call('DEL', KEYS[2])
return 'pending_cleared' -- Worker 侧发 MQ 延迟消息
end
return v and 'success' or 'no_pending'
Worker 侧对 Lua 调用设超时(如 500ms)。失败或超时时 fallback:按 pipelineId 校验 value 后 GET + DEL running;再读 DB pending_reanalysis,为 true 则同样发延迟补跑消息。running 锁 TTL 300s 可自愈,但 fallback 避免等满 TTL 才补跑。
监控:alert_running_lock_count(alert:running:* 键数量)、Lua 失败/超时次数。
补跑与查库去重
延迟补跑走 alert.reanalysis.delay(默认 TTL 45s,可配;比查库周期 180s 短、比 5s 更能错开查库 Job)。消息体带 is_reanalysis=true,pipeline 表同步写 is_reanalysis=1。
| 环节 | 去重 |
|---|---|
| 查库 Job(持 fetch 锁) | 若该患者 45s 内存在 is_reanalysis=1 且 pipeline 状态 ∈ {CREATED,…,AI_RUNNING,DONE},跳过本次入队 |
| 补跑 Worker | 消费前再查 running 锁;idempotency_key 含 reanalysis 前缀 |
| debounce | 补跑 pipeline 与正常 pipeline 共用 30s debounce(可配) |
5.0.1 水位(Watermark)什么时候更新?
查到了 10:01、10:02、10:03 的数据后 AI 失败,水位该不该动?更新了可能丢数,不更新下轮又重复查。规则如下:
水位只在 AI 成功且 alert_record 落库之后,与 record 同一事务内更新。
查库阶段(事务1):
读采集库 record_time > watermark
写 pipeline + log + outbox
❌ 不更新 watermark
AI 成功阶段(事务2):
BEGIN
写 alert_record + alert_notice
更新 pipeline → DONE
UPDATE alert_patient_watermark SET last_data_time = window_end ← 只在这里更新
COMMIT
AI 失败:
watermark 不变 → 下轮对账/重试仍覆盖同一段数据 → 不丢
| 时机 | 是否更新 watermark | 原因 |
|---|---|---|
| 查库发现新数据 | ❌ | 还没分析完,不能认为"已处理" |
| 入队 / 投递 MQ | ❌ | 任务可能失败 |
| AI 成功 + record 落库 | ✅ | 这段数据确实分析完了 |
| AI 失败 / 超时 | ❌ | 下轮重试或对账会再覆盖 |
5.0.2 数据窗口怎么算?
不是固定「最近 30 分钟」,而是动态区间:
window_start = last_data_time(水位,上次成功处理到的时刻)
window_end = 当前查库时刻(或该患者采集库 MAX(record_time))
- 配置项
window_minutes是上限:若window_end - window_start > 30min,只取最近 30min(防 AI 输入过大)。 - 正常情况窗口是
[last_watermark, now],不重叠、不重复——上次成功到哪,这次就从哪开始。
示例:10:00 成功分析到 10:00 → 10:06 查库 window = [10:00, 10:06],不会重复分析 10:00 之前的数据。
5.0.3 事务边界(两阶段本地事务)
┌─ 事务1(查库 Job 线程)─────────────────────────────┐
│ BEGIN │
│ INSERT alert_pipeline (CREATED) │
│ INSERT alert_pipeline_log (DATA_FETCH) │
│ INSERT alert_outbox (PENDING) │
│ COMMIT │
└──────────────────────────────────────────────────────┘
↓ Outbox Relay(事务外,~1s 轮询)
↓ RabbitMQ Confirm
┌─ Worker 消费(无事务,调 AI 5~30s)──────────────────┐
│ HTTP 调 AI(带 requestId = pipelineId,AI 端幂等) │
└──────────────────────────────────────────────────────┘
↓ AI 返回成功
┌─ 事务2(Worker 线程)─────────────────────────────────┐
│ BEGIN │
│ INSERT alert_record + alert_notice │
│ UPDATE alert_pipeline → DONE │
│ UPDATE alert_patient_watermark.last_data_time │
│ COMMIT │
│ basicAck(MQ) │
└──────────────────────────────────────────────────────┘
↓ 异步
WebSocket 广播推送 / REST 兜底
事务失败与回滚
- 事务1、事务2均使用
@Transactional(rollbackFor = Exception.class),任一步异常整笔回滚。 - 事务2失败时:
alert_record/ 水位均不提交;alert_pipeline标FAILED,alert_pipeline_log记异常;MQ 消息 nack 进重试。 idempotency_key或 record 唯一索引冲突:视为重复消费,记日志后直接 ACK,不重复落库。- 事务2成功后才
basicAck;AI 调通但落库失败时不确认 MQ,保证可重试。
5.0.4 AI 超时与幂等
AI 侧其实已成功,但网络超时未收到响应时,平台若直接重试会导致重复分析。处理如下:
| 措施 | 说明 |
|---|---|
请求带 requestId |
值 = pipelineId,AI 侧按 requestId 幂等 |
| 结果查询接口 | AI 提供 GET /result/{requestId}(联调约定);Worker 超时后先查结果,已有则直接落库,没有再重试 |
| 平台 Consumer 幂等 | idempotency_key 唯一索引,重复消息 ACK 跳过 |
| Resilience4j | 每产品线独立熔断:连续失败熔断 5min,标 AI_UNAVAILABLE |
5.0.5 AI 限流与降级
并发上限由 MQ 排队 + Worker prefetch + Resilience4j Bulkhead(每线默认 5)控制。任务突增时先进队列,不会同时打满 AI。
| 层级 | 机制 | 默认 |
|---|---|---|
| MQ | 任务先进队列排队 | 削峰,不丢 |
| Worker | prefetch 限制未 ACK 数 |
5~10 |
| AI 调用 | Resilience4j Bulkhead(信号量) | 每线并发 5 |
| AI 调用 | RateLimiter | 可配 QPS 上限 |
| AI 挂了 | CircuitBreaker 熔断 5min | 任务积压 MQ,查库继续、入队继续 |
| AI 恢复 | 熔断关闭 | Worker 继续消费积压队列 |
AI 不可用时的行为:
查库 Job → 正常跑(不受影响)
Outbox/MQ → 正常入队(任务积压队列)
Worker → 熔断打开,暂停调 AI,消息留在队列
AI 恢复 → 熔断关闭,Worker 逐步消费积压
对账 Job → 继续跑,发现漏判补跑
5.0.6 Outbox 轮询与投递确认
一期用 Polling Publisher(约 1s 轮询 alert_outbox WHERE status=PENDING):
| 方案 | 一期 | 说明 |
|---|---|---|
| Polling | 采用 | 日 PENDING 量低,1s 轮询可接受 |
| CDC(Canal / Debezium) | 二期 | 任务量或延迟要求上来后再考虑 |
状态变更顺序(避免「标记已发但 MQ 未持久化」):
Relay 取 PENDING 行
→ 发 RabbitMQ(persistent + mandatory)
→ 阻塞等待 Publisher Confirm(broker ack)
→ Confirm 成功后才 UPDATE outbox SET status=PUBLISHED
→ Confirm 失败或超时:保持 PENDING,下轮重发
补偿 Job(XXL-Job,每 1~5 分钟):扫描 status=PENDING AND create_time < now()-30s 的记录,强制重发并打告警。单条 outbox 累计重试 不超过 10 次(retry_count 字段),超限标 FAILED 并告警,避免对 MQ 死循环打。
禁止在收到 Confirm 之前把 outbox 标为 PUBLISHED。
5.0.7 WebSocket 推送策略(一期 License / 二期 SSO)
现有客户端没有用户登录,用 AiLicense(license + mac) 做设备级鉴权,与麻醉/重症 AI 客户端一致。一期按终端 / 房间 / 病区推送,不按个人——和 License 绑 room 的模型一致。
| 阶段 | 推送方式 | 鉴权 | 说明 |
|---|---|---|---|
| 一期 | 按 产品线 + 房间/病区 广播 | License + mac → wsToken | 同一 room/ward 下所有在线终端收到相同预警 |
| 二期 | 按 用户 精准推送 | SSO Token | Token 解析 userId,recipient_id 路由到个人 |
License → wsToken → WebSocket 建连(一期)
客户端启动
① POST /anesthesia/aiLicense/verify(或 ICU 对应接口)
参数:license + clientMac ← 现有客户端已有
② POST /alert/ws/token
参数:license + mac + productLine ← 预警平台新增,复用同一套 License 校验逻辑
返回:wsToken(JWT,有效期 24h,含 license、mac、productLine、roomId)
③ WebSocket 建连
wss://alert-host/alert/ws?token={wsToken}
❌ URL 中禁止裸传 license(安全)
④ 服务端
校验 wsToken → 解析 roomId
Session 注册到订阅组:{productLine}:room:{roomId} 或 {productLine}:ward:{wardCode}
⑤ 有预警时
读 alert_record / alert_notice 中的 push_route_key
向对应订阅组广播(带 seq)
REST 兜底接口同样携带 Authorization: Bearer {wsToken}(或 Header 传 license+mac),与 WS 共用鉴权。
roomId / deptCode / ward_code 映射(路由键对齐)
License 表(ai_license)有 roomId、category;采集库患者有 room_id、dept_code、ward_code。推送路由以采集库患者位置为准,License 的 roomId 须与订阅维度对齐:
| 产品线 | License 字段 | 采集库字段(患者位置) | 推送路由键 push_route_key |
WS 订阅组(终端注册) |
|---|---|---|---|---|
| ANES(麻醉) | roomId(手术室编号) |
room_id |
ANES:room:{room_id} |
ANES:room:{license.roomId} |
| ICU(重症) | roomId(护士站/病房编号,绑定时约定) |
ward_code 或 dept_code |
ICU:ward:{ward_code} |
ICU:ward:{license.roomId} |
规则:
- 落库时(写
alert_record/alert_notice):从采集库快照患者room_id/ward_code/dept_code,计算并写入push_route_key。 - 建连时:终端 License 绑定的
roomId决定它订阅哪个组;须与上表约定一致(绑 OR03 的终端只收 OR03 的预警)。 dept_code作为 REST 查询的辅助维度保留;WS 广播以push_route_key为准。
License roomId 与患者 room 对齐
License 的 roomId 必须和采集库患者 room_id / ward_code 同一套编码,否则终端订阅了却收不到,或收到别的 room 的预警。
| 环节 | 做法 |
|---|---|
| License 绑定 | 绑定时校验 roomId 在科室/手术室配置表中存在(复用现有 AiLicense 绑定流程) |
| 联调 | 与数据采集确认 ICU ward_code、ANES room_id 与 License roomId 对照表 |
| 运行期 | 预警落库时 push_route_key 来自采集库快照;WS 订阅组来自 License;两者编码不一致则推送不到 |
推送路由与注意事项
| 点 | 说明 | 方案 |
|---|---|---|
| 禁止全产品线广播 | 若向整个 ANES 广播,所有手术室都会收到无关预警 |
必须按 push_route_key 精准路由,只推给 {productLine}:room/ward:{xxx} |
| 同房间多终端 | 多台设备绑定同一 roomId |
全部收到相同预警——手术室大屏/护士站场景符合预期 |
| 已读 / 确认 | 一期无 userId | POST .../read 请求体带 clientId(license 或 mac),落库 alert_notice.read_client_id;不能追溯「哪个医生」,只能追溯「哪台终端」 |
| 处置确认 ack | 同上 | alert_record.acked_client_id + acked_time;二期 SSO 后改为 acked_by(userId) |
| WS 安全 | license 是长期凭证 | 仅 HTTP 换短期 wsToken;WS / REST 只带 token,不传 license 明文 |
| token 过期 | wsToken 24h 过期 | 客户端定时刷新或 WS 断连时用 license+mac 重新换 token |
| 患者转科/转床 | 产生预警后患者已离开 | 一期路由不变;旧 room 仍可见历史未读 |
患者转科/转床:push_route_key 业务规则
一期约定:历史预警以产生时刻的位置为准,push_route_key 落库后不再修改。
| 现象 | 是否接受 | 说明 |
|---|---|---|
| 患者转出 OR03 后,OR03 终端仍能看到转出前产生的未读预警 | 接受 | 快照 + 审计需要;医护可在旧终端已读/确认 |
| 转入 OR05 后,OR05 终端看不到转科前的历史预警 | 接受 | 新预警用新 room 的 push_route_key |
| 历史预警跟随患者最新位置推送 | 一期不做 | 二期用 original_push_route_key + 位置快照做迁移 |
落库时除 push_route_key 外,额外写入(一期就存,二期改路由时用):
| 字段 | 说明 |
|---|---|
push_route_key |
产生时刻的 WS/REST 路由,一期不变 |
original_push_route_key |
与 push_route_key 相同,永久保留 |
location_snapshot |
JSON:{room_id, ward_code, bed_no, dept_code} 产生时快照 |
新预警始终按当时采集库快照写路由;REST 未读按 push_route_key 过滤。
一期 alert_notice 落库字段:product_line、room_id、ward_code、dept_code、push_route_key、is_read、read_client_id;REST 未读按 push_route_key 或 product_line + room_id/ward_code 过滤。alert_record 同时快照 patient_name、bed_no、体征 JSON 等展示字段——推送 payload 全部来自本地表,不回查采集库(终端读不了采集库;采集库抖动不影响已生成预警的推送;患者转科后历史预警以产生时快照为准)。
5.0.8 线程池与连接池规划
| 组件 | 线程池 / 连接 | 默认 | 说明 |
|---|---|---|---|
| XXL-Job 查库 | Scheduler 线程 | 4 | ICU/ANES 各 Job + 对账 |
| AlertWorker | @RabbitListener concurrency |
5~10/产品线 | 与 AI Bulkhead 一致 |
| Outbox Relay | 单线程 Scheduled | 1 | 1s 轮询 |
| WebSocket 推送 | 独立线程池 | core=4, max=8 | 不阻塞 Worker |
| HTTP 调 AI | WebClient 连接池 | max=20/产品线 | 复用连接 |
| 采集库只读 | HikariCP | max=10 | 独立数据源 |
| 业务库 | HikariCP | max=20 | 读写 |
5.0.9 监控体系
| 技术 | 用途 |
|---|---|
| Micrometer | 指标埋点(Spring Boot Actuator) |
| Prometheus | 指标采集存储 |
| Grafana | 可视化大盘 |
| 指标 | 告警阈值(示例) |
|---|---|
| MQ 队列深度 | > 100 告警 |
| MQ DLQ 堆积 | > 0 告警 |
| AI 调用 P95 耗时 | > 25s 告警 |
| AI 超时率 | > 5% 告警 |
| AI 熔断状态 | OPEN 告警 |
| Outbox PENDING 积压 | > 50 且持续 5min;单条 PENDING > 30s;单条 retry_count ≥ 10 |
| Redis running 锁 | alert:running:* 键数量、Lua 失败/超时次数、锁获取失败率 |
| 补跑占比 | is_reanalysis=1 的 pipeline 占当日总量 |
| 活跃患者缓存 | 刷新 Job 耗时、失败次数、查库是否命中缓存 |
| Watermark 延迟 | 采集库 MAX(record_time) - watermark > 15min |
| WebSocket 在线连接数 | 监控趋势 |
| 端到端延迟 P95 | > 10min 告警 |
5.0.10 患者数据与采集库只读:推送为何不查采集库
研判阶段读采集库,推送阶段只读本地业务库——不做患者主数据全量同步。
为什么推送不能依赖采集库?
| 原因 | 说明 |
|---|---|
| 客户端读不了采集库 | 前端/终端只能连预警平台;姓名、床号、风险摘要等展示字段必须在推送 payload 里,不能指望终端再去查采集库 |
| 推送要稳定 | 采集库抖动、慢查、短暂不可用,不应影响已生成预警的 WebSocket 推送和 REST 兜底 |
| 审计要固定 | 预警产生时患者在哪张床、哪个 room/ward,应快照固化;不能事后回采集库再查(患者可能已转科/出院) |
两阶段数据用法
┌─ 阶段 A:研判(Worker)────────────────────────────────────┐
│ 读采集库(只读)→ 取体征 + 患者路由信息 → HTTP 调 AI │
│ ❌ 此阶段不写采集库;watermark 在本地维护 │
└────────────────────────────────────────────────────────────┘
↓ AI 成功,事务2提交
┌─ 阶段 B:落库 + 推送(本地 alert_platform)─────────────────┐
│ alert_record 写入展示快照 + push_route_key │
│ alert_notice 写入 → WebSocket 广播 / REST 兜底 │
│ ✅ 此阶段不再查采集库;payload 全部来自本地表 │
└────────────────────────────────────────────────────────────┘
Worker 写 alert_record 时,把当时从采集库查到的展示字段一次性快照进去;之后 WS/REST 只读本地库。WS 按 push_route_key(如 ANES:room:OR03)向对应订阅组广播;客户端用 license+mac 换 wsToken 后建连,终端 License 绑定的 roomId 决定订阅哪个组。
本地存什么?(不是全量同步)
| 存储位置 | 存什么 | 用途 | 必须 |
|---|---|---|---|
alert_record |
patient_key、patient_name、bed_no、room_id、ward_code、dept_code、push_route_key、risk_level、AI 摘要、vitals_snapshot(JSON) |
推送 payload + 列表展示 + 审计 | 是 |
alert_notice |
product_line、room_id/ward_code、push_route_key、seq(自增 id)、is_read、read_client_id |
WS 路由 + REST 兜底 | 是 |
alert_patient_watermark |
每患者 last_data_time、pending_reanalysis |
增量查采集库、延迟补跑 | 是 |
| Redis 活跃患者缓存 | patient_key → room/ward/bed,TTL 1~2min |
减少每轮查采集库;不是持久化同步 | 建议 |
| — | 采集库仍是 source of truth | 否 |
患者主数据不做全量同步到预警平台;只在产生预警时把当时需要的字段快照进 alert_record。采集库只读对「查数研判 / 推送」的影响
| 场景 | 影响 | 现有应对 |
|---|---|---|
| 只能 SELECT | 不能写 watermark 到采集库 | watermark 放本地 alert_patient_watermark |
| 3 分钟轮询 | 新体征有延迟 | 可配 poll_interval;对账 Job 补漏 |
| 采集库短暂不可用 | 本轮查不到新数据,无法产生新预警 | 对账 + 人工补偿;已落库的预警照常 WS/REST 推送 |
| AI 不可用 | 无法研判新数据 | 任务积压 MQ;已有 alert_notice 照常推 |
| 患者转科 | 实时位置变了 | 历史预警以产生时快照为准;新预警用新位置;不跟随最新 room 改 push_route_key |
什么时候才需要考虑「同步」?
一期通常不需要。仅以下情况才值得加本地表或 CDC:
| 条件 | 方案 | 阶段 |
|---|---|---|
| 采集库 QPS 限得很死,活跃患者列表都查不动 | 轻量 alert_patient_cache 表,Job 定时刷新 |
按需 |
| 复杂患者检索/统计,且不能每次打采集库 | CDC 同步患者维度表(Canal/Debezium) | 二期 |
| 采集库与预警平台网络隔离,Worker 根本连不上 | 消息/文件同步(非当前场景) | — |
当前约束是可只读查询采集库,走「定时增量查 + 快照落库 + 本地推送」即可。
六、四个硬问题怎么解决
6.1 轮询的性能和延迟
活跃患者怎么查
「只查在科/术中患者」如果每轮对采集库做 WHERE status='在科' 全表扫描,容易成为热点。做法:
| 项 | 做法 |
|---|---|
| 活跃列表缓存 | Redis alert:active:{productLine},存 patient_key 列表及 room/ward/bed,TTL 60~90s |
| 刷新 Job | 独立 XXL-Job 每 60~90s 增量刷 Redis(见下) |
| 查体征 SQL | patient_key IN (活跃列表) AND record_time > 水位,禁止无 patient_key 的大范围扫 |
| 查库 Job | 读 Redis 活跃列表 + 按患者抢 fetch 锁后查增量体征 |
刷新 Job 不宜每轮全表扫。 SQL 示例:
SELECT patient_key, room_id, ward_code, bed_no, dept_code
FROM ...
WHERE status IN ('在科','术中')
AND last_update_time > :lastRefreshTime -- 增量;首轮或定时全量
有入科/出科时间字段时,用「当前在科」视图或 admit_time / discharge_time 过滤。刷新失败:告警 + 继续用上一版 Redis 缓存(TTL 未到前查库 Job 照常跑);连续失败 N 次升级告警。
性能手段
轮询查库的性能和延迟,主要靠增量水位、活跃列表缓存、分批查询和独立连接池:
| 手段 | 做法 | 量化效果 |
|---|---|---|
| 增量水位 | 每患者记上次处理时间点,只查 record_time > 水位,走 (patient_id, record_time) 联合索引 |
命中行数=一个窗口的体征条数(几十行),单次 <10ms |
| 只查活跃患者 | 列表来自 Redis alert:active:*,SQL 带 patient_key IN (...) |
候选约 200 人,避免扫全库 |
| 缓存活跃列表 | 独立 Job 每 60~90s 刷新 Redis | 查库 Job 不每轮打采集库要名单 |
| 分批 + 独立连接池 | 每批 ~50 人一条 SQL,采集库独立连接池(max 10) | 每轮约 200/50 = 4 次批量查询;不与业务库抢连接 |
| 查库与 AI 解耦 | 查库线程只入队,不等 AI | 单轮查库 数百 ms~数秒完成,不被 5~30s 的 AI 拖垮 |
单轮查库耗时:4 批 ×(约 10~50ms)≈ 1 秒内,小于 180s 周期。
端到端延迟(约 1.5~5 分钟):
| 环节 | 耗时 | 是否瓶颈 |
|---|---|---|
| 等下一轮查库 | 0~180s(平均 90s) | 周期决定,可调小 |
| 查库 + 落库 | <1s | 否 |
| MQ 排队 + 消费 | 秒级(取决于 AI 并发) | 否 |
| AI 研判 | 5~30s | 是 |
| WebSocket 推送 | <1s(内网) | 否 |
瓶颈在查库周期和 AI 耗时。要缩短等待时间可调 poll_interval_seconds(需评估采集库压力);AI 慢则靠队列排队,查库线程不被拖住。
6.2 数据可靠性
常见故障(MQ 抖动、WS 断线、AI 超时、进程重启)下,研判任务和预警记录应能恢复、不静默丢失;通知最终到达终端,允许秒到分钟级延迟。
以下依赖基础设施或运维,不在本期应用层单独解决:
| 情况 | 说明 |
|---|---|
| 业务库整库不可用 | 依赖 DB 备份与恢复 |
| 采集库长期不可用 | 无法产生新预警;已落库记录仍可推送 |
| 终端长期离线且不调 REST | 未读留在 alert_notice,上线后拉取 |
| 对账周期内的漏判 | 15~30 分钟内由对账 Job 或人工补偿补跑 |
各段分开容错,MQ 允许重复投递(幂等去重),按最终一致设计。
(1)查库 → 入队:Outbox(本地事务 + 异步投递)
最容易丢的地方:业务库事务提交了,但发往 MQ 的消息没发出去(或反过来)。我们不用 Seata/XA 这种重量级跨库事务,用 Outbox(发件箱)模式,本质是把"要发的消息"和"业务数据"放进同一个本地事务:
① 写库阶段(一个本地事务,要么全成功要么全回滚)
BEGIN
写 alert_pipeline (流水线)
写 alert_pipeline_log (日志)
写 alert_outbox (待发消息, 状态=PENDING)
COMMIT
② 发送阶段(独立线程,约 1s 轮询)
取 PENDING → 发 RabbitMQ → **等 Publisher Confirm**
Confirm 成功 → 标 PUBLISHED
Confirm 失败/超时 → 保持 PENDING,下轮重发
补偿 Job:PENDING 超过 30s 重发;retry_count ≥ 10 标 FAILED
③ 消费阶段(幂等)
Worker 用 idempotency_key 去重 → 调 AI、落库 → 成功才确认(ACK)
失败 basicNack → TTL 重试队列(退避)→ 超 3~5 次进死信
这样即使 MQ 临时挂了,待发任务仍留在 alert_outbox,恢复后补发。Outbox 发完标记 PUBLISHED,可定时清理,不会无限膨胀。
不用 XA/Seata:业务库内本地事务 + Outbox 最终一致,够用且更简单。
幂等:MQ 可能重复投递,idempotency_key 唯一索引保证同一研判作业只处理一次。
(2)RabbitMQ:生产 / 存储 / 消费三段
生产者 ──▶ Broker ──▶ 消费者
① ② ③
| 段 | 风险 | 措施 |
|---|---|---|
| ① 生产者→Broker | 发了没收到 / 路由不到 | Publisher Confirm(broker 确认才算成功)+ Outbox 重发 |
| ② Broker 自身 | 重启就没了 | 交换机/队列 durable + 消息持久化(落盘);上量后用 Quorum 多副本队列 |
| ③ Broker→消费者 | 处理没完就丢 | 手动 ACK(业务成功才确认)+ 失败进重试队列 + 超次进死信队列 |
生产侧 Outbox + Confirm,Broker 持久化,消费侧手动 ACK、重试和死信,一段失败由下一段承接。
(3)推送到终端:先落库,再推送,断了也能捞
WebSocket 管实时,可靠性靠先写 alert_notice、再 WS 推送、seq 补发和 REST 未读拉取。连接会断,设计上按断线常态处理。
| 层级 | 机制 | 说明 |
|---|---|---|
| L1 落库 | 推送前先写 alert_notice |
推失败也不影响记录已在库 |
| L2 实时 | WS 按 push_route_key 广播 |
内网通常 <1s |
| L3 补发 | 重连带 lastSeq,补 id > seq |
断线期间的增量补 |
| L4 兜底 | GET /alert/notices/unread |
不依赖 WS 状态,终端主动拉 |
| L5 对账 | 定时比对采集库 vs 水位 | 漏判自动补跑;仍失败走人工补偿 |
心跳 30s;重连退避 1s→…→30s;服务端 60s 无心跳回收连接。
REST 未读(最后一层):
| 接口 | 作用 |
|---|---|
GET /alert/notices/unread?productLine=ANES&roomId=OR03 |
拉本 room 全部未读 |
GET /alert/notices?sinceId={seq} |
增量拉(Header 带 wsToken) |
POST /alert/notices/{id}/read |
标记已读(Body 带 clientId) |
已读 / 确认:前端回调落库;一期 clientId=license/mac(终端级),二期 SSO 后改 userId。HIGH 级可要求必须 ack(二期)。
(4)对账与人工补偿
XXL-Job 每 15~30 分钟:采集库某患者最新数据时间 vs 本地水位,有漏判则自动补跑。仍搞不定的进死信,由人工补偿处理台介入。
6.3 出问题能查:全程可追溯
每条预警就是一条**流水线(Pipeline)**记录,配一张阶段日志表,把"查数→调 AI→推送"每一步都记下来。
alert_pipeline:每条预警一行,记当前状态、风险级别、重试次数、错误。alert_pipeline_log:每个阶段一行,记阶段、耗时、请求/响应摘要、错误原因。
状态是细粒度的(不是只有"成功/失败"):
已创建 → 已查库 → 已入队 → 消费中 → 已提交AI → AI完成
→ 预警已生成 → 推送中 → 已推送 → 完成
任意一步失败 → 等待重试 →(超次)→ 死信,等对账或人工补跑
排查示例——P001 未收到预警:查 alert_pipeline 状态为 AI_TIMEOUT → 查 alert_pipeline_log 见 AI 阶段 30s 读超时 → 任务在重试队列,等待或人工补跑。
6.4 实时看 + 能统计
实时监控大盘(管理端,Micrometer + Prometheus + Grafana):
| 面板 | 看什么 | 告警阈值(示例) |
|---|---|---|
| 流水线 | 各阶段排队 / 进行中 / 今日完成 | — |
| 队列 | RabbitMQ 深度、DLQ、reanalysis 队列 | DLQ >0 |
| Redis | running 锁数量、Lua 失败率、fetch 锁失败率 | 异常抬升 |
| 补跑 | is_reanalysis pipeline 占比 | 长期 >20% 需排查 |
| 活跃缓存 | 刷新耗时、失败次数 | 连续失败告警 |
| AI | 调用次数、成功率、P95 耗时、超时率 | P95 >25s;超时率 >5%;熔断 OPEN |
| 推送 | 推送成功率、重试堆积 | — |
| Outbox | PENDING 积压 | >50 且持续 5min |
| 水位 | 采集库 MAX(record_time) − watermark | 延迟 >15min |
| WebSocket | 在线连接数 | 监控趋势 |
| 延迟 | 端到端 P50 / P95 | P95 >10min |
统计报表:按时间、风险级别、产品线、病区等维度统计预警量、AI 成功率等。实时查询走预警表聚合,历史报表用每日汇总表,避免大范围实时聚合压库。
6.5 人工补偿处理台
自动重试、对账仍无法处理的异常任务(死信、长期 AI 超时、对账漏判),由运维/管理员在人工补偿处理台介入:
死信 / 对账告警
↓
管理端「补偿处理台」列出异常 Pipeline
↓
操作员查看 pipeline_log → 确认原因
↓
┌─ 补跑 AI ─────────┐ ┌─ 跳过并标记 ────┐ ┌─ 强制关闭 ─────┐
│ 重新入队(新 │ │ 人工确认无需 │ │ 记录原因, │
│ idempotency_key) │ │ 预警,更新水位 │ │ 不再自动重试 │
└───────────────────┘ └─────────────────┘ └────────────────┘
↓
操作记入 alert_pipeline_log(操作人、时间、原因)
| 能力 | 说明 |
|---|---|
| 异常列表 | 按状态(DLQ、AI_TIMEOUT、对账漏判)、patient_key、时间范围、productLine 过滤 |
| 单条补跑 | 指定患者 + 时间窗,生成新 Pipeline 重新入队 |
| 批量补跑 | 对账 Job 漏判一键补跑(二期) |
| 操作审计 | 人工操作写 pipeline_log |
人工补偿用于自动重试和对账仍无法恢复的任务;一期先做异常列表和单条补跑,批量与跳过放二期。
七、消息怎么收发(同步/异步、RabbitMQ、多语言订阅)
7.1 哪些是同步、哪些是异步
写本地库走同步,调 AI 和推送走异步:
| 环节 | 同步/异步 | 说明 |
|---|---|---|
| 查采集库 + 写流水线/日志/Outbox | 同步 | 调度线程内一个本地事务完成 |
| 投递 RabbitMQ | 异步 | 查库线程只入队,不等 AI |
| 消费任务 + 调 AI | 异步 | AI 5~30 秒,不能阻塞查库 |
| 写预警/通知 | 同步 | Worker 事务内落库 |
| WebSocket 推送 | 异步 | 推送失败有重试和 REST 补拉 |
| 前端拉未读 | 同步 | HTTP 请求 |
7.2 RabbitMQ 通讯模型
RabbitMQ 只在平台内部传研判任务:查库写入任务消息,Worker 消费后调 AI,不对前端开放。
| 作用 | 说明 |
|---|---|
| 解耦 | 查库(快,秒级)和调 AI(慢,5~30s)分开,查库线程入队即返回,不被 AI 拖垮 |
| 削峰 | 每轮查库可能一下产生几十条任务,先进队列排队,Worker 按并发 5 平稳消费,不冲垮 AI |
| 分线隔离 | ICU / ANES 各自队列,重症 AI 慢不影响麻醉,互不抢占 |
| 可靠不丢 | 持久化 + Publisher Confirm + 手动 ACK + 重试/死信,配合 Outbox(写 pipeline + outbox 同一本地事务,Relay 轮询发 MQ,失败重发)保证任务不丢失 |
注意:RabbitMQ 传内部研判任务,医护端走 WebSocket + REST。
拓扑:
┌──────────────────────────────┐
Outbox 投递 ─▶ │ Exchange: alert.task (direct) │
└───────┬───────────────┬───────┘
routing=ICU│ │routing=ANES
▼ ▼
alert.task.icu alert.task.anes ← 重症/麻醉分队列,互不影响
│ │
Worker(ICU) Worker(ANES) ← 各自按并发消费,调各自 AI
│
失败 ─▶ alert.retry.*(TTL 延迟重试)─▶ 回主队列
超次 ─▶ alert.dlq.*(死信,对账/人工补跑)
AI 完成且 pending=true ─▶ alert.reanalysis.delay(TTL 45s,is_reanalysis=true)─▶ alert.task.*
| 机制 | 配置 | 作用 |
|---|---|---|
| 消息体 | JSON:患者、时间窗、产品线、流水线 id、幂等键 | 一条 = 一次研判作业 |
| 可靠投递 | Publisher Confirm | broker 确认收到才算发成功 |
| 持久化 | 交换机/队列 durable + 消息落盘 | 重启不丢 |
| 消费确认 | 手动 ACK | 业务成功才确认,崩溃自动重投 |
| 限流 | prefetch(与 AI 并发一致) | 防止积压 |
| 重试/死信 | TTL 重试队列 + DLQ | 失败退避、永久失败留存 |
| 扩展 | 加 routing key + 队列 + Worker | 新增急诊等产品线不动老队列 |
7.3 前端 / 不同语言怎么订阅预警
医护端不直连 RabbitMQ,统一走 WebSocket 和 REST。
一期(License 鉴权,按 room/ward 广播):
① 设备鉴权(现有)
POST /anesthesia/aiLicense/verify?license=xxx&clientMac=yy:yy:...
(ICU 客户端走 /icu/aiLicense/verify,路径与现有项目一致)
② 换取 WS Token(预警平台新增)
POST /alert/ws/token
Body: { "license": "xxx", "mac": "yy:yy:...", "productLine": "ANES" }
→ 校验逻辑复用 AiLicenseService(license 有效 + mac 匹配 + 已绑定)
→ 返回 { "wsToken": "eyJ...", "roomId": "OR03", "expiresIn": 86400 }
③ WebSocket 建连
wss://alert-host/alert/ws?token={wsToken}
服务端:校验 token → Session 注册到 ANES:room:OR03(或 ICU:ward:W01)
④ 接收推送
服务端按 alert_notice.push_route_key 向对应组广播(JSON + seq)
客户端只处理 productLine 匹配的 message
⑤ 断线重连
带 lastSeq → 服务端补发 id > lastSeq;token 过期则重新执行 ②
⑥ REST 兜底(Header: Authorization: Bearer {wsToken})
GET /alert/notices/unread?productLine=ANES&roomId=OR03
POST /alert/notices/{id}/read Body: { "clientId": "license或mac" }
POST /alert/records/{id}/ack Body: { "clientId": "license或mac" }
推送路由示例:
| 事件 | push_route_key | 谁收到 |
|---|---|---|
| 患者 P001 在 OR03 产生 HIGH 预警 | ANES:room:OR03 |
仅订阅 ANES:room:OR03 的终端 |
| 患者 P002 在 W01 病区产生 MEDIUM 预警 | ICU:ward:W01 |
仅订阅 ICU:ward:W01 的终端 |
| ❌ 错误做法 | 向 ANES 全产品线广播 |
所有手术室都收到,必须禁止 |
二期(接入 SSO,按用户精准推送):
1. 登录拿 Token(Sa-Token/JWT)
2. 建 WebSocket:wss://主机/alert/ws?token=...&productLine=ICU
3. 服务端按 用户 + 产品线 推送;REST 按 recipient_id 过滤
任何能发 HTTP / 连 WebSocket 的语言都能接,不用引 MQ 客户端:
| 客户端 | 怎么接 |
|---|---|
| Vue / React | 浏览器原生 WebSocket |
| 安卓 / iOS | OkHttp WebSocket / URLSession |
| Python | websockets + requests |
| C# / Java | ClientWebSocket / 标准 WS 客户端 |
消息带 productLine,客户端只处理自己产品线的,重症/麻醉互不串。
二期若有外部系统(如急诊)要订阅"预警已产生"事件,再单独开放 HTTP Webhook 或事件 Exchange,不影响医护端这套。
八、不确定的事,全做成配置
目前几个还没定的点,不写死在代码里,全部做成配置表 alert_config,业务部门讨论定了之后改配置即可,不用改代码、不用重新发版。
| 配置项 | 默认 | 对应你不确定的事 |
|---|---|---|
window_minutes |
30 | 每个患者取多长时间范围的数据 |
poll_interval_seconds |
180 | 多久查一次库 |
ai_max_concurrency |
5 | AI 并发能力(AI 强就调大) |
ai_timeout_seconds |
30 | AI 单次超时 |
push_targets |
— | 推给哪些角色/病区/前端 |
risk_threshold_push |
中风险 | 达到什么级别才推 |
每条配置按产品线(ICU/ANES)独立。AI 处理能力不确定也不怕:任务先进队列排队,AI 多慢都只是排队,不会丢、不会压垮 AI;能力摸清后调并发即可。
九、核心数据表与索引
业务库 alert_platform,核心就这几张。采集库(只读)存体征与患者主数据;业务库存预警、通知、水位与快照——研判读前者,推送只读后者,不做患者主数据全量同步。
| 表 | 作用 |
|---|---|
alert_config |
各产品线配置(窗口、周期、AI、推送规则) |
alert_patient_watermark |
每个患者的处理水位(增量查询、防漏判) |
alert_pipeline |
流水线主记录(含 is_reanalysis、状态、幂等键) |
alert_pipeline_log |
流水线阶段日志(排查用) |
alert_record |
预警记录 + 患者/体征快照(推送 payload 来源,不回头查采集库) |
alert_notice |
终端通知(push_route_key 路由、未读、seq) |
alert_outbox |
发件箱(retry_count,超 10 次标 FAILED) |
alert_push_retry |
推送失败重试 |
alert_stats_daily |
每日统计汇总 |
主要字段(快照与推送)
alert_record — Worker 在 AI 成功时一次性写入,后续 WS/REST 只读此表组装 payload:
| 字段 | 来源 | 用途 |
|---|---|---|
patient_key |
采集库 | 患者唯一标识 |
patient_name |
采集库快照 | 列表/弹窗展示 |
bed_no |
采集库快照 | 床号展示 |
room_id / ward_code / dept_code |
采集库快照 | 位置展示 + 计算 push_route_key |
push_route_key |
计算 | WS/REST 路由(产生时刻) |
original_push_route_key |
同 push_route_key | 永久保留,二期改跟随路由时用 |
location_snapshot |
采集库快照 JSON | room/ward/bed/dept 产生时位置 |
risk_level |
AI 返回 | 风险级别 |
ai_summary |
AI 返回 | 研判摘要 |
vitals_snapshot |
采集库窗口内体征 JSON | 详情页/审计 |
pipeline_id |
本地 | 关联流水线 |
acked_client_id / acked_time |
终端回调 | 处置确认(一期终端级) |
alert_notice — 推送前先落库,保证「不丢」:
| 字段 | 用途 |
|---|---|
id(自增) |
推送序号 seq |
record_id |
关联 alert_record |
product_line |
产品线过滤 |
push_route_key |
WS 广播 + REST 未读 |
original_push_route_key |
与 record 一致,审计用 |
room_id / ward_code |
辅助查询 |
is_read / read_client_id / read_time |
已读状态(一期 clientId=license/mac) |
status |
delivered / pending 等推送状态 |
alert_patient_watermark — 仅用于增量查采集库,与推送无关:
| 字段 | 用途 |
|---|---|
product_line + patient_key |
唯一键 |
last_data_time |
增量查询起点;仅在 AI 成功 + alert_record 落库后同事务更新 |
pending_reanalysis |
running 期间有新体征标 true;Lua 完成后经延迟 MQ 补跑 |
Redis(非表,建议) — alert:active:{productLine}、patient_key→room/ward/bed,TTL 1~2min,减少采集库压力;不落库、不同步。
索引设计
索引按实际查询路径设计,避免全表扫:
| 表 | 索引 | 用途 |
|---|---|---|
alert_pipeline |
idx_status (status) |
监控大盘按状态统计 |
idx_create_time (create_time) |
按时间范围查流水线 | |
idx_patient_time (product_line, patient_key, create_time, is_reanalysis) |
补跑去重、人工台按患者+时间查 | |
uk_idempotency (idempotency_key) |
幂等去重(唯一) | |
alert_pipeline_log |
idx_pipeline (pipeline_id) |
排查某条流水线各阶段 |
alert_record |
idx_patient (product_line, patient_key, create_time) |
患者预警历史 |
idx_risk_time (risk_level, create_time) |
按风险级别统计 | |
alert_notice |
idx_route_unread (push_route_key, is_read, id) |
REST 未读 + WS 路由(一期) |
idx_recipient (recipient_id, is_read, id) |
二期 SSO 按用户未读 | |
alert_outbox |
idx_pending (status, create_time) |
Outbox 轮询 PENDING |
alert_patient_watermark |
uk_patient (product_line, patient_key) |
水位读写(唯一) |
alert_push_retry |
idx_status_time (status, next_retry_time) |
推送重试调度 |
建表 SQL 联调时再定,这里只列职责和字段意图。
十、部署与容量
部署:独立 jar / Docker 一个服务(REST + WebSocket + 内嵌调度),加 MySQL、Redis、RabbitMQ。采集库用只读账号、独立数据源。
alert-platform.jar (8080: REST + WebSocket)
MySQL alert_platform 业务库
MySQL icu-read / anes-read 采集库(只读)
Redis / RabbitMQ / XXL-Job
容量推算(200 患者、180s 周期、每轮 30% 有新数据):
| 指标 | 推算 | 结果 |
|---|---|---|
| 每轮入队任务 | 200 × 30% | 60 条 |
| 平均入队速率 | 60 / 180s | ~0.33 条/秒 |
| 日任务量 | 60 × (86400/180) | ~2.88 万条/天 |
| 日新增流水线行 | ≈ 日任务量 | ~2.88 万行/天 |
| 日志行(每任务 ~5 阶段) | 2.88 万 × 5 | ~14 万行/天 |
| AI 处理能力 | 每线并发 5 × (60s/单次20s) | ~15 条/分钟/线,> 入队速率 |
按上面推算:
- AI 处理(~15 条/分钟/线)高于平均入队(双线约 20 条/分钟),队列可吸收突发;
- 日增约 2.88 万行业务数据 + 14 万行日志,三年内单表可不分,日志按月归档;
- 一期 1~2 个实例即可。
1000 患者时的扩展:
1000 患者 × 30% × 480 轮/天 ≈ 14.4 万条/天(5 倍于 200 人)
平均入队 ~1.7 条/秒,日峰 ~5 条/秒 —— 仍远低于 RabbitMQ 能力
瓶颈仍在 AI:单线并发 5、单次 20s → ~15 条/分钟/线
| 扩展手段 | 做法 | 是否改业务代码 |
|---|---|---|
| 加 Worker 实例 | 多实例竞争消费同一队列 | ❌ 不改 |
| 调 AI 并发 | ai_max_concurrency 配置调大(受 AI 服务上限约束) |
❌ 不改 |
| 加产品线队列 | 新 routing key + 队列 + Worker | ❌ 不改 |
| 分库分表 | 日增 >50 万行时再考虑 | 远期 |
患者从 200 扩到 1000:加 Worker 实例、调 ai_max_concurrency 即可,RabbitMQ 竞争消费分摊负载,业务代码不用改。
AI 熔断期间 Worker 暂停调 AI,查库和入队照常,任务留在 MQ;AI 恢复后继续消费。期间无法产生新预警,已写入 alert_notice 的仍可推送。
十一、实施计划
| 阶段 | 目标 | 交付 |
|---|---|---|
| 一期 MVP | 跑通主流程 | 双线查库 → 调 AI → 推送(WS 科室广播 + REST 兜底)→ 预警列表 + 基础日志 |
| 二期 可靠性 | 生产可用 | Outbox、重试/熔断、对账、监控大盘、人工补偿处理台 |
| 三期 增强 | 体验完善 | SSO 按用户推送、预警确认闭环、更多产品线接入 |
需要各方确认的:
| 方 | 要提供 |
|---|---|
| 数据采集 | 只读库账号、体征表结构、在科/术中判定 SQL、room_id/ward_code 与 License roomId 对照 |
| AI 团队 | 各线 API 地址、鉴权、请求/响应格式、requestId 幂等、GET /result/{requestId} 结果查询、处理能力上限 |
| 业务/产品 | 时间窗口、推送科室范围、风险级别定义、是否需医护确认 |
| 前端 | 复用 AiLicense 鉴权;对接 /alert/ws/token、WS 建连、已读/确认带 clientId |