AI 流式消息持久化机制与 ChunkBuffer 竞态问题修复总结

AI 流式消息持久化机制与 ChunkBuffer 竞态问题修复

记录智能麻醉系统中 AI 流式响应的完整消费链路、批量缓冲持久化设计,以及一次偶发性 response_payload 数据丢失问题的根因分析与修复。

一、背景

系统后端为 Java Spring Boot 微服务,AI 能力通过调用 Dify 平台的 chat-messages 流式接口(SSE)实现。用户在前端发起请求后,Dify 以 Server-Sent Events 的形式逐块返回 AI 生成内容,后端需要:

  1. 实时推送给前端 —— 用户能看到逐字输出效果
  2. 完整持久化到数据库 —— ai_message 表的 content(答案正文)和 response_payload(原始 SSE 全量数据)两个字段都需要落库

二、整体架构:三条数据轨道

一次流式调用中,有三个 StringBuilder 分别追踪不同维度的数据:

缓冲区 数据来源 最终去向 说明
rawPayloadSb Dify 吐出的原始 SSE chunk(不做任何处理直接追加) ai_message.response_payload 保留完整 SSE 事件流,用于回溯/调试
rawSb processChunk() 的返回值 辅助数据源 清洗后的 chunk(去掉 data: 前缀等)
finalSb AiChunkUtils.cleanChunk()rawSb 中提取的 answer 纯文本 ai_message.content 最终展示给用户的答案正文
Dify SSE chunk
      │
      ├──→ rawPayloadSb.append(chunk)           ← response_payload 全量保留
      │
      └──→ processChunk(chunk)
              │
              ├──→ rawSb.append(processedChunk)  ← 处理后的完整 JSON
              │
              └──→ cleanChunk(processedChunk)
                      │
                      └──→ finalSb.append(answer) ← 纯文本答案 → content

2.1 processChunk —— 预处理

// AiPayloadHelperOptimized.java
private String processChunk(String chunk) {
    if (chunk == null) return null;
    if (chunk.isEmpty()) return null;

    // 基础清理(去掉 "data:" 前缀,解析 JSON)
    String cleaned = AiChunkUtils.cleanChunk(chunk);
    if (cleaned == null) {
        cleaned = AiChunkUtils.cleanChunkFallback(chunk);
    }
    return cleaned;
}

2.2 cleanChunk —— 提取 answer 文本

// AiChunkUtils.java
public static String cleanChunk(String chunk) {
    String s = chunk;
    if (s.startsWith("data:")) s = s.substring(5).trim();

    // 尝试解析为 JSON,提取 answer / output / completion / choices 字段
    Map<String, Object> m = MAPPER.readValue(s, ...);
    if (m.containsKey("answer"))    return String.valueOf(m.get("answer"));
    if (m.containsKey("output"))    return String.valueOf(m.get("output"));
    // ... 其他字段兜底
    
    return s;  // 没有匹配到文本字段,返回原 JSON 串
}

举例,Dify 吐出以下 SSE 事件序列:

序号 原始 chunk processChunk 结果 cleanChunk 结果
1 {"event":"workflow_started","data":{...70KB患者信息...}} 完整 JSON 串 完整 JSON 串
2 {"event":"node_started",...} 完整 JSON 串 完整 JSON 串
3 {"event":"node_finished",...} 完整 JSON 串 完整 JSON 串
4 {"event":"message","answer":"苏"} "苏" "苏"
5 {"event":"message","answer":"醒"} "醒" "醒"
6 {"event":"message","answer":"注意事项"} "注意事项" "注意事项"

三轨数据示例——第 4 个 chunk 到达后:

rawPayloadSb = "{"event":"workflow_started"...}" + "{"event":"node_started"...}" + "{"event":"node_finished"...}" + "{"event":"message","answer":"苏"}"
rawSb        = "{"event":"workflow_started"...}" + "{"event":"node_started"...}" + "{"event":"node_finished"...}" + "苏"
finalSb      = "" + "" + "" + "苏"

三、流式处理主循环

核心入口 streamToAiOptimized 位于 AiPayloadHelperOptimized.java,结构如下:

public AutoCloseable streamToAiOptimized(...) {
    // 1. 创建会话记录,拿到 aiMessageId
    StreamStartInfo startInfo = aiMessageServiceEnhanced.startStreamingSession(...);
    Long aiMessageId = startInfo.aiMessageId();

    // 2. 三个 StringBuilder + 两个持久化计数器
    final StringBuilder rawPayloadSb = new StringBuilder();   // response_payload
    final StringBuilder rawSb = new StringBuilder();          // processed chunks
    final StringBuilder finalSb = new StringBuilder();        // 纯文本答案
    final AtomicInteger lastPersistedRawLen = new AtomicInteger(0);
    final AtomicInteger lastPersistedFinalLen = new AtomicInteger(0);

    // 3. 订阅 Dify SSE 流
    return difyClient.createChatMessageStream(
        chunk -> {                          // onChunk:每来一块数据
            rawPayloadSb.append(chunk);
            String processed = processChunk(chunk);
            if (processed != null && !processed.isEmpty()) {
                rawSb.append(processed);
                String answer = AiChunkUtils.cleanChunk(processed);
                if (answer != null && !answer.isEmpty()) {
                    finalSb.append(answer);
                }
                handleOptimizedPersistence(...);  // 持久化调度
                onChunk.accept(answer);           // 推送前端
            }
        },
        () -> {                             // onComplete:流正常结束
            handleStreamCompletion(...);    // 收尾:flush剩余 + 完成会话
            onComplete.run();
        },
        error -> {                          // onError:流异常中断
            aiMessageService.failStreamingSession(...);
            onError.accept(error);
        }
    );
}

生命周期图

startStreamingSession()          SSE streaming                handleStreamCompletion()
      │                              │                               │
      ▼                              ▼                               ▼
┌──────────┐    chunk1  ┌───────────────────┐    last chunk   ┌──────────────────────┐
│ 创建     │ ────────→ │ handleOptimized   │ ─────────────→ │ forceFlush           │
│ ai_msg   │    chunk2  │ Persistence 循环   │                │ waitForBufferFlush   │
│ ai_session│ ────────→ │  - rawPayloadSb   │                │ completeStreaming    │
└──────────┘    ...     │  - rawSb          │                │ Session              │
                        │  - finalSb        │                └──────────────────────┘
                        │  - 增量持久化      │
                        └───────────────────┘

四、持久化层:ChunkBuffer 批量缓冲机制

4.1 为什么需要缓冲区

SSE 流可能每秒产生几十个 chunk。如果每个 chunk 都执行一次 UPDATE ... SET content = CONCAT(COALESCE(content,''), #{chunk}),数据库压力巨大且锁竞争严重。为此引入批量缓冲机制。

4.2 设计

chunk → addChunk → ConcurrentLinkedQueue 累积
                        ↓
              shouldFlush()?
              ├─ size ≥ 20 (BATCH_SIZE)   → flush
              ├─ 距上次更新 > 500ms        → flush
              └─ 检测到尾事件              → flush
                        ↓
              flushBuffer → getAllChunks() → batchAppendWithRetry → SQL 一次写入

ChunkBuffer 数据结构

private static class ChunkBuffer {
    private final ConcurrentLinkedQueue<String> chunks = new ConcurrentLinkedQueue<>();      // final chunks
    private final ConcurrentLinkedQueue<String> chunksPlain = new ConcurrentLinkedQueue<>(); // content_plain
    private final ConcurrentLinkedQueue<String> chunksRaw = new ConcurrentLinkedQueue<>();   // response_payload
    private final AtomicInteger size = new AtomicInteger(0);
    private final AtomicBoolean processing = new AtomicBoolean(false);  // 防并发 flush
    private volatile long lastUpdateTime = System.currentTimeMillis();
}

入队与触发 flush

public void appendStreamingChunkOptimized(Long aiMessageId, String finalChunk, String rawChunk) {
    ChunkBuffer buffer = chunkBuffers.computeIfAbsent(aiMessageId, k -> new ChunkBuffer());
    buffer.addChunk(finalChunk, chunkPlain, rawChunk);    // 入队

    // 条件1:达到批量阈值 或 超时
    if (buffer.shouldFlush() && buffer.processing.compareAndSet(false, true)) {
        scheduler.execute(() -> flushBuffer(aiMessageId, buffer));
        return;
    }

    // 条件2:检测到尾事件(workflow_finished / message_end / node_finished)
    if (tailEventDetected && buffer.processing.compareAndSet(false, true)) {
        scheduler.execute(() -> flushBuffer(aiMessageId, buffer));
    }
}

flush 执行

private void flushBuffer(Long aiMessageId, ChunkBuffer buffer) {
    try {
        if (buffer.size.get() == 0) return;

        // drain 队列:把所有累积的 chunk 取出拼接
        String allChunks = buffer.getAllChunks();       // poll() 逐个取出
        String allChunksPlain = buffer.getAllChunksPlain();
        String allChunksRaw = buffer.getAllChunksRaw();

        // 一次性写入 3 个字段
        batchAppendWithRetry(aiMessageId, allChunks, allChunksPlain, allChunksRaw);

    } catch (Exception e) {
        log.error("Failed to flush buffer", e);
    } finally {
        buffer.clear();   // ← 重置缓冲区状态
    }
}

五、增量持久化调度:handleOptimizedPersistence

每个 chunk 触发一次,计算自上次持久化以来的增量(delta),发往缓冲区:

private void handleOptimizedPersistence(Long aiMessageId,
        StringBuilder finalSb, StringBuilder rawSb, StringBuilder rawPayloadSb,
        AtomicInteger chunkCounter, AtomicInteger lastPersistedFinalLen,
        AtomicInteger lastPersistedRawLen, long[] lastAppendAt) {

    // 计算 final 增量
    String finalDelta;
    synchronized (finalSb) {
        int already = lastPersistedFinalLen.get();
        finalDelta = finalSb.length() > already ? finalSb.substring(already) : "";
    }

    // 计算 raw 增量(response_payload 的来源)
    String rawDelta;
    synchronized (rawPayloadSb) {
        int alreadyRaw = lastPersistedRawLen.get();
        rawDelta = rawPayloadSb.length() > alreadyRaw ? rawPayloadSb.substring(alreadyRaw) : "";
    }

    if (finalDelta非空 || rawDelta非空) {
        // 把增量发送给缓冲区
        aiMessageServiceEnhanced.appendStreamingChunkOptimized(aiMessageId, finalDelta, rawDelta);

        // ⚡ 关键:立即更新计数器(不等 DB 真的写完!)
        lastPersistedFinalLen.addAndGet(finalDelta.length());
        lastPersistedRawLen.addAndGet(rawDelta.length());
    }
}

计数器为什么立即递增?

因为缓冲区是异步的,如果等到 DB 返回再更新,下一个 chunk 到达时又会算出同样的 delta,导致重复写入。所以约定:

放入缓冲区 = 已持久化,计数器立即前进。

六、流结束收尾:handleStreamCompletion

当 Dify SSE 流正常结束时调用:

private void handleStreamCompletion(Long aiMessageId,
        StringBuilder finalSb, StringBuilder rawSb, StringBuilder rawPayloadSb,
        AtomicInteger lastPersistedFinalLen, AtomicInteger lastPersistedRawLen) {

    // 1. 计算剩余数据(计数器之后的增量)
    String finalRemaining;
    synchronized (finalSb) {
        finalRemaining = finalSb.substring(lastPersistedFinalLen.get());
    }
    String rawRemaining;
    synchronized (rawPayloadSb) {
        rawRemaining = rawPayloadSb.substring(lastPersistedRawLen.get());
    }

    // 2. 将剩余数据发送到缓冲区
    aiMessageServiceEnhanced.appendStreamingChunkOptimized(aiMessageId, finalRemaining, rawRemaining);
    lastPersistedFinalLen.addAndGet(finalRemaining.length());
    lastPersistedRawLen.addAndGet(rawRemaining.length());

    // 3. 强制 flush + 等待完成
    aiMessageServiceEnhanced.forceFlush(aiMessageId);
    aiMessageServiceEnhanced.waitForBufferFlush(aiMessageId);

    // 4. 完成会话:content = 最终答案文本
    String finalAnswer = finalSb.toString();
    aiMessageServiceEnhanced.completeStreamingSession(aiMessageId, finalAnswer);

    // 5. 补写可能缺失的尾事件
    ensureTailEventsPersisted(aiMessageId, rawPayloadSb);
}

waitForBufferFlush 的实现——轮询等待,超时则强制 flush:

public void waitForBufferFlush(Long aiMessageId) {
    ChunkBuffer buffer = chunkBuffers.get(aiMessageId);
    int maxWaitTime = 2000;   // 最多等 2 秒
    int checkInterval = 50;   // 每 50ms 检查一次

    while (totalWaited < maxWaitTime) {
        if (buffer.size.get() == 0 && !buffer.processing.get()) {
            return;  // flush 完成
        }
        Thread.sleep(checkInterval);
    }

    // 超时兜底:强制再 flush 一次
    if (buffer.size.get() > 0) {
        flushBuffer(aiMessageId, buffer);
    }
}

七、问题:点击停止响应后,response_payload 偶发性丢失数据

7.1 现象

用户点击"停止响应"后,ai_message.response_payload 中偶尔缺少最开始的几个 SSE chunk。例如:

应该包含 实际缺失
{"event":"message","answer":"苏"} ❌ 丢失
{"event":"message","answer":"醒"} ❌ 丢失
{"event":"message","answer":"注意事项"} ❌ 丢失

数据库对比(同一功能、同一患者):

消息 ID response_payload 长度 数据完整性
776883610171085840 94,632 字节 ✅ 完整,包含"苏醒注意事项清单"标题
776883610171085847 89,615 字节 ❌ 缺失 ~5KB,丢失初始 answer 块

7.2 根因:ChunkBuffer.clear() 的竞态条件

问题出在 flushBuffer 方法中 drainclear 之间的时间窗口。

正常流程 (无竞态)

flushBuffer 线程:
    getAllChunks() → drain [A, B, C]
    batchAppendWithRetry(A+B+C)  ← DB 写入
    buffer.clear()  → 队列空,状态归零

竞态场景

时间 ───────────────────────────────────────────────────────────>

flushBuffer 线程:
    getAllChunks()  drain了大块数据[workflow_started(70KB) + node_started + node_finished]
    │                       队列现在为空
    │                       └──── DB 写入中(70KB 耗时较长) ────┐
    │                                                          ↓
    │                                                   buffer.clear()
    │                                               ConcurrentLinkedQueue.clear()
    │                                               清空了 [answer chunks]!❌
    
流处理线程:
                    新的 answer chunk 到达 → addChunk([answer:"苏"], [answer:"醒"], ...)
                    shouldFlush()  → processing 已 = true → 跳过
                    tailEvent 检测 → processing 已 = true → 跳过

handleOptimizedPersistence:
    lastPersistedRawLen += answerDelta.length()   ← 计数器已递增!

竞态窗口getAllChunks() drain 完后到 clear() 执行前,DB 写入是耗时操作(尤其第一块 workflow_started 约 70KB)。

在这个窗口内

  1. 新的 answer chunk 到达 → addChunk() 入队成功
  2. processing=true,不会触发新的 flush
  3. handleOptimizedPersistence 立即更新了 lastPersistedRawLen(计数器说"已持久化")
  4. DB 写入完成后 clear() 执行 → ConcurrentLinkedQueue.clear() 把刚入队的 answer chunk 也清掉了

最终后果

  • lastPersistedRawLen 已包含 answer chunk 的长度 → handleStreamCompletion 认为没有剩余数据
  • 缓冲区队列已被 clear() 清空 → 数据实际未写入 DB
  • answer chunk 永久丢失

为什么是第一个 answer 块最常丢失

因为第一个 handleOptimizedPersistence 的 delta 包含了巨大的 workflow_started 事件(~70KB 的患者信息 JSON),且其中含有 node_finished 字符串触发尾事件立即 flush。70KB 的 DB 写入耗时较长,正好给了后续 answer chunk 到达的窗口期。


八、修复

修改前

public void clear() {
    chunks.clear();         // ← 清空所有,包括窗口期新入队的
    chunksPlain.clear();
    chunksRaw.clear();
    size.set(0);
    processing.set(false);
}

修改后

/**
 * 结束本轮 flush,只重置 processing 标志并重新计算剩余 size。
 * 
 * 不再 clear 队列:flushBuffer 已通过 getAllChunks* 将本轮数据 drain 走,
 * 但 drain 与 clear 之间的窗口期内可能有新 chunk 入队。
 * 若 clear 会把这些新 chunk 也清掉,导致数据丢失。
 * 因此仅根据队列实际剩余量修正 size,保留新入队的数据供下一轮 flush。
 */
public void clear() {
    int remaining = chunks.size();
    size.set(remaining);
    processing.set(false);
}

修复逻辑

  • drain 阶段 getAllChunks() 已取出本轮数据,队列中只剩窗口期新入队的数据
  • 不调用 ConcurrentLinkedQueue.clear(),保留这些新数据
  • size 设为队列实际剩余量,而非归零
  • processing 置为 false,允许后续 shouldFlush()waitForBufferFlush 兜底处理

修复后的时序

flushBuffer 线程:
    getAllChunks()  drain [workflow_started(70KB) + ...]
    │                       队列现在为空
    │                       └──── DB 写入中 ────┐
    │                                           ↓
    │                                    buffer.clear()
    │                              remaining = chunks.size() = 1 (answer chunk)
    │                              size.set(1), processing = false
    │                                    ✅ 保留了 answer chunk
    
流处理线程:
                     answer chunk 到达 → addChunk → size=1

后续:
    handleStreamCompletion → forceFlush → processing.compareAndSet(false,true) = true
    → flushBuffer(answer chunk) → DB 写入
    ✅ answer chunk 正确落库

九、完整链路总结

┌──────────────┐     HTTP SSE      ┌──────────────────────────────────────┐
│   Dify AI    │ ────────────────→ │        streamToAiOptimized           │
│  (云服务)     │   chunk by chunk  │                                      │
└──────────────┘                   │  rawPayloadSb ←──────── chunk        │
                                   │       │                              │
                                   │  processChunk(chunk)                 │
                                   │       │                              │
                                   │  rawSb ←── processedChunk            │
                                   │       │                              │
                                   │  cleanChunk(processedChunk)          │
                                   │       │                              │
                                   │  finalSb ←── answer                  │
                                   │       │                              │
                                   │  ┌────┴────────────────────┐         │
                                   │  │ handleOptimizedPersistence│         │
                                   │  │  增量 = sb[计数器:]       │         │
                                   │  │  appendStreamingChunk    │         │
                                   │  │  Optimized(delta)        │         │
                                   │  │  计数器 += delta.length  │         │
                                   │  └──────────┬──────────────┘         │
                                   └─────────────┼────────────────────────┘
                                                 │
                                    ┌────────────▼──────────────────────┐
                                    │       ChunkBuffer                 │
                                    │  ConcurrentLinkedQueue<chunks>    │
                                    │  - shouldFlush? (≥20/500ms/tail)  │
                                    │  - addChunk → 入队                │
                                    │  - getAllChunks → drain → SQL     │
                                    │  - clear → 只重置processing+size  │
                                    └────────────┬──────────────────────┘
                                                 │
                                    ┌────────────▼──────────────────────┐
                                    │           MySQL                   │
                                    │  UPDATE ai_message                │
                                    │  SET content = CONCAT(...)        │
                                    │      response_payload = CONCAT(...)│
                                    └───────────────────────────────────┘

三条轨道最终去向:

轨道 写入方式 DB字段 最终处理
finalSb appendStreamingChunkOptimized + completeStreamingSession 覆盖 content 流结束时用完整答案文本覆盖
rawPayloadSb appendStreamingChunkOptimized(增量 CONCAT) response_payload 全量 SSE 原始数据,不断 CONCAT 追加
content_plain 无(已移除默认赋值) content_plain 仅由 ContentPlainHelper.parseContentPlain 清洗后赋值

十、教训与经验

  1. 异步缓冲区的 drain-and-clear 模式天然存在竞态窗口。drain 和 clear 之间的任何操作(DB 写入、网络 IO)时间越长,窗口越大,丢数据风险越高。
  2. 计数器不应在异步操作完成前更新,否则会丢失"数据实际未落库但计数器已前进"的追踪能力。但这里受限于架构(需要在下一个 chunk 到达前阻止重复计算 delta),所以采用了折中方案。
  3. ConcurrentLinkedQueue.clear() 是破坏性操作——它会清空 drain 之后新入队的所有元素。在多线程生产者-消费者场景中,不应假设 drain 后到 clear 前没有新元素入队。
  4. 修复后 clear() 的语义变更为"重置处理状态"而非"清空缓冲区",这更符合实际需求:drain 已经取走了需要处理的数据,剩下的只是窗口期新来的,应该保留而非丢弃。

相关文件:

  • AiPayloadHelperOptimized.java — 流处理主循环 + 持久化调度
  • AiMessageServiceImplEnhanced.java — ChunkBuffer 定义 + flush 逻辑
  • AiMessageMapperOptimized.javabatchAppendContentSeparate SQL
  • AiChunkUtils.java — chunk 清洗工具

Read more

患者 AI 消息预警平台 — 技术方案

· v1.0 · 2026-06-30 把「采集库里的患者体征 → 丢给 AI 研判 → 把风险预警实时推给医护」这条链路,做成一个独立、可靠、可追溯的后台平台。支持多条产品线,一期接入**重症(ICU)和麻醉(ANES)**两条产品线。 文中关键参数(查库周期、AI 并发/超时、容量等)附默认值和测算过程。 一、为什么单独建一个项目 体征数据已经由采集系统写进了数据库。我们要做的是中间这段:定时取数 → 调 AI → 拿结果 → 推送前端,并保证全程不丢、可查、可统计。 平台不做两件事:不做体征采集入库(采集系统已有),不做 AI 模型(AI 团队提供 HTTP 接口)。 采集库由其他部门维护,

By 周天浩

患者 AI 消息预警平台 — 技术方案

· v1.0 · 2026-06-30 把「采集库里的患者体征 → 丢给 AI 研判 → 把风险预警实时推给医护」这条链路,做成一个独立、可靠、可追溯的后台平台。支持多条产品线,一期接入**重症(ICU)和麻醉(ANES)**两条产品线。 文中关键参数(查库周期、AI 并发/超时、容量等)附默认值和测算过程。 一、为什么单独建一个项目 体征数据已经由采集系统写进了数据库。我们要做的是中间这段:定时取数 → 调 AI → 拿结果 → 推送前端,并保证全程不丢、可查、可统计。 平台不做两件事:不做体征采集入库(采集系统已有),不做 AI 模型(AI 团队提供 HTTP 接口)。 采集库由其他部门维护,

By 周天浩