AI 流式消息持久化机制与 ChunkBuffer 竞态问题修复总结
AI 流式消息持久化机制与 ChunkBuffer 竞态问题修复
记录智能麻醉系统中 AI 流式响应的完整消费链路、批量缓冲持久化设计,以及一次偶发性 response_payload 数据丢失问题的根因分析与修复。一、背景
系统后端为 Java Spring Boot 微服务,AI 能力通过调用 Dify 平台的 chat-messages 流式接口(SSE)实现。用户在前端发起请求后,Dify 以 Server-Sent Events 的形式逐块返回 AI 生成内容,后端需要:
- 实时推送给前端 —— 用户能看到逐字输出效果
- 完整持久化到数据库 ——
ai_message表的content(答案正文)和response_payload(原始 SSE 全量数据)两个字段都需要落库
二、整体架构:三条数据轨道
一次流式调用中,有三个 StringBuilder 分别追踪不同维度的数据:
| 缓冲区 | 数据来源 | 最终去向 | 说明 |
|---|---|---|---|
rawPayloadSb |
Dify 吐出的原始 SSE chunk(不做任何处理直接追加) | ai_message.response_payload |
保留完整 SSE 事件流,用于回溯/调试 |
rawSb |
processChunk() 的返回值 |
辅助数据源 | 清洗后的 chunk(去掉 data: 前缀等) |
finalSb |
AiChunkUtils.cleanChunk() 从 rawSb 中提取的 answer 纯文本 |
ai_message.content |
最终展示给用户的答案正文 |
Dify SSE chunk
│
├──→ rawPayloadSb.append(chunk) ← response_payload 全量保留
│
└──→ processChunk(chunk)
│
├──→ rawSb.append(processedChunk) ← 处理后的完整 JSON
│
└──→ cleanChunk(processedChunk)
│
└──→ finalSb.append(answer) ← 纯文本答案 → content
2.1 processChunk —— 预处理
// AiPayloadHelperOptimized.java
private String processChunk(String chunk) {
if (chunk == null) return null;
if (chunk.isEmpty()) return null;
// 基础清理(去掉 "data:" 前缀,解析 JSON)
String cleaned = AiChunkUtils.cleanChunk(chunk);
if (cleaned == null) {
cleaned = AiChunkUtils.cleanChunkFallback(chunk);
}
return cleaned;
}
2.2 cleanChunk —— 提取 answer 文本
// AiChunkUtils.java
public static String cleanChunk(String chunk) {
String s = chunk;
if (s.startsWith("data:")) s = s.substring(5).trim();
// 尝试解析为 JSON,提取 answer / output / completion / choices 字段
Map<String, Object> m = MAPPER.readValue(s, ...);
if (m.containsKey("answer")) return String.valueOf(m.get("answer"));
if (m.containsKey("output")) return String.valueOf(m.get("output"));
// ... 其他字段兜底
return s; // 没有匹配到文本字段,返回原 JSON 串
}
举例,Dify 吐出以下 SSE 事件序列:
| 序号 | 原始 chunk | processChunk 结果 | cleanChunk 结果 |
|---|---|---|---|
| 1 | {"event":"workflow_started","data":{...70KB患者信息...}} |
完整 JSON 串 | 完整 JSON 串 |
| 2 | {"event":"node_started",...} |
完整 JSON 串 | 完整 JSON 串 |
| 3 | {"event":"node_finished",...} |
完整 JSON 串 | 完整 JSON 串 |
| 4 | {"event":"message","answer":"苏"} |
"苏" |
"苏" |
| 5 | {"event":"message","answer":"醒"} |
"醒" |
"醒" |
| 6 | {"event":"message","answer":"注意事项"} |
"注意事项" |
"注意事项" |
三轨数据示例——第 4 个 chunk 到达后:
rawPayloadSb = "{"event":"workflow_started"...}" + "{"event":"node_started"...}" + "{"event":"node_finished"...}" + "{"event":"message","answer":"苏"}"
rawSb = "{"event":"workflow_started"...}" + "{"event":"node_started"...}" + "{"event":"node_finished"...}" + "苏"
finalSb = "" + "" + "" + "苏"
三、流式处理主循环
核心入口 streamToAiOptimized 位于 AiPayloadHelperOptimized.java,结构如下:
public AutoCloseable streamToAiOptimized(...) {
// 1. 创建会话记录,拿到 aiMessageId
StreamStartInfo startInfo = aiMessageServiceEnhanced.startStreamingSession(...);
Long aiMessageId = startInfo.aiMessageId();
// 2. 三个 StringBuilder + 两个持久化计数器
final StringBuilder rawPayloadSb = new StringBuilder(); // response_payload
final StringBuilder rawSb = new StringBuilder(); // processed chunks
final StringBuilder finalSb = new StringBuilder(); // 纯文本答案
final AtomicInteger lastPersistedRawLen = new AtomicInteger(0);
final AtomicInteger lastPersistedFinalLen = new AtomicInteger(0);
// 3. 订阅 Dify SSE 流
return difyClient.createChatMessageStream(
chunk -> { // onChunk:每来一块数据
rawPayloadSb.append(chunk);
String processed = processChunk(chunk);
if (processed != null && !processed.isEmpty()) {
rawSb.append(processed);
String answer = AiChunkUtils.cleanChunk(processed);
if (answer != null && !answer.isEmpty()) {
finalSb.append(answer);
}
handleOptimizedPersistence(...); // 持久化调度
onChunk.accept(answer); // 推送前端
}
},
() -> { // onComplete:流正常结束
handleStreamCompletion(...); // 收尾:flush剩余 + 完成会话
onComplete.run();
},
error -> { // onError:流异常中断
aiMessageService.failStreamingSession(...);
onError.accept(error);
}
);
}
生命周期图:
startStreamingSession() SSE streaming handleStreamCompletion()
│ │ │
▼ ▼ ▼
┌──────────┐ chunk1 ┌───────────────────┐ last chunk ┌──────────────────────┐
│ 创建 │ ────────→ │ handleOptimized │ ─────────────→ │ forceFlush │
│ ai_msg │ chunk2 │ Persistence 循环 │ │ waitForBufferFlush │
│ ai_session│ ────────→ │ - rawPayloadSb │ │ completeStreaming │
└──────────┘ ... │ - rawSb │ │ Session │
│ - finalSb │ └──────────────────────┘
│ - 增量持久化 │
└───────────────────┘
四、持久化层:ChunkBuffer 批量缓冲机制
4.1 为什么需要缓冲区
SSE 流可能每秒产生几十个 chunk。如果每个 chunk 都执行一次 UPDATE ... SET content = CONCAT(COALESCE(content,''), #{chunk}),数据库压力巨大且锁竞争严重。为此引入批量缓冲机制。
4.2 设计
chunk → addChunk → ConcurrentLinkedQueue 累积
↓
shouldFlush()?
├─ size ≥ 20 (BATCH_SIZE) → flush
├─ 距上次更新 > 500ms → flush
└─ 检测到尾事件 → flush
↓
flushBuffer → getAllChunks() → batchAppendWithRetry → SQL 一次写入
ChunkBuffer 数据结构:
private static class ChunkBuffer {
private final ConcurrentLinkedQueue<String> chunks = new ConcurrentLinkedQueue<>(); // final chunks
private final ConcurrentLinkedQueue<String> chunksPlain = new ConcurrentLinkedQueue<>(); // content_plain
private final ConcurrentLinkedQueue<String> chunksRaw = new ConcurrentLinkedQueue<>(); // response_payload
private final AtomicInteger size = new AtomicInteger(0);
private final AtomicBoolean processing = new AtomicBoolean(false); // 防并发 flush
private volatile long lastUpdateTime = System.currentTimeMillis();
}
入队与触发 flush:
public void appendStreamingChunkOptimized(Long aiMessageId, String finalChunk, String rawChunk) {
ChunkBuffer buffer = chunkBuffers.computeIfAbsent(aiMessageId, k -> new ChunkBuffer());
buffer.addChunk(finalChunk, chunkPlain, rawChunk); // 入队
// 条件1:达到批量阈值 或 超时
if (buffer.shouldFlush() && buffer.processing.compareAndSet(false, true)) {
scheduler.execute(() -> flushBuffer(aiMessageId, buffer));
return;
}
// 条件2:检测到尾事件(workflow_finished / message_end / node_finished)
if (tailEventDetected && buffer.processing.compareAndSet(false, true)) {
scheduler.execute(() -> flushBuffer(aiMessageId, buffer));
}
}
flush 执行:
private void flushBuffer(Long aiMessageId, ChunkBuffer buffer) {
try {
if (buffer.size.get() == 0) return;
// drain 队列:把所有累积的 chunk 取出拼接
String allChunks = buffer.getAllChunks(); // poll() 逐个取出
String allChunksPlain = buffer.getAllChunksPlain();
String allChunksRaw = buffer.getAllChunksRaw();
// 一次性写入 3 个字段
batchAppendWithRetry(aiMessageId, allChunks, allChunksPlain, allChunksRaw);
} catch (Exception e) {
log.error("Failed to flush buffer", e);
} finally {
buffer.clear(); // ← 重置缓冲区状态
}
}
五、增量持久化调度:handleOptimizedPersistence
每个 chunk 触发一次,计算自上次持久化以来的增量(delta),发往缓冲区:
private void handleOptimizedPersistence(Long aiMessageId,
StringBuilder finalSb, StringBuilder rawSb, StringBuilder rawPayloadSb,
AtomicInteger chunkCounter, AtomicInteger lastPersistedFinalLen,
AtomicInteger lastPersistedRawLen, long[] lastAppendAt) {
// 计算 final 增量
String finalDelta;
synchronized (finalSb) {
int already = lastPersistedFinalLen.get();
finalDelta = finalSb.length() > already ? finalSb.substring(already) : "";
}
// 计算 raw 增量(response_payload 的来源)
String rawDelta;
synchronized (rawPayloadSb) {
int alreadyRaw = lastPersistedRawLen.get();
rawDelta = rawPayloadSb.length() > alreadyRaw ? rawPayloadSb.substring(alreadyRaw) : "";
}
if (finalDelta非空 || rawDelta非空) {
// 把增量发送给缓冲区
aiMessageServiceEnhanced.appendStreamingChunkOptimized(aiMessageId, finalDelta, rawDelta);
// ⚡ 关键:立即更新计数器(不等 DB 真的写完!)
lastPersistedFinalLen.addAndGet(finalDelta.length());
lastPersistedRawLen.addAndGet(rawDelta.length());
}
}
计数器为什么立即递增?
因为缓冲区是异步的,如果等到 DB 返回再更新,下一个 chunk 到达时又会算出同样的 delta,导致重复写入。所以约定:
放入缓冲区 = 已持久化,计数器立即前进。
六、流结束收尾:handleStreamCompletion
当 Dify SSE 流正常结束时调用:
private void handleStreamCompletion(Long aiMessageId,
StringBuilder finalSb, StringBuilder rawSb, StringBuilder rawPayloadSb,
AtomicInteger lastPersistedFinalLen, AtomicInteger lastPersistedRawLen) {
// 1. 计算剩余数据(计数器之后的增量)
String finalRemaining;
synchronized (finalSb) {
finalRemaining = finalSb.substring(lastPersistedFinalLen.get());
}
String rawRemaining;
synchronized (rawPayloadSb) {
rawRemaining = rawPayloadSb.substring(lastPersistedRawLen.get());
}
// 2. 将剩余数据发送到缓冲区
aiMessageServiceEnhanced.appendStreamingChunkOptimized(aiMessageId, finalRemaining, rawRemaining);
lastPersistedFinalLen.addAndGet(finalRemaining.length());
lastPersistedRawLen.addAndGet(rawRemaining.length());
// 3. 强制 flush + 等待完成
aiMessageServiceEnhanced.forceFlush(aiMessageId);
aiMessageServiceEnhanced.waitForBufferFlush(aiMessageId);
// 4. 完成会话:content = 最终答案文本
String finalAnswer = finalSb.toString();
aiMessageServiceEnhanced.completeStreamingSession(aiMessageId, finalAnswer);
// 5. 补写可能缺失的尾事件
ensureTailEventsPersisted(aiMessageId, rawPayloadSb);
}
waitForBufferFlush 的实现——轮询等待,超时则强制 flush:
public void waitForBufferFlush(Long aiMessageId) {
ChunkBuffer buffer = chunkBuffers.get(aiMessageId);
int maxWaitTime = 2000; // 最多等 2 秒
int checkInterval = 50; // 每 50ms 检查一次
while (totalWaited < maxWaitTime) {
if (buffer.size.get() == 0 && !buffer.processing.get()) {
return; // flush 完成
}
Thread.sleep(checkInterval);
}
// 超时兜底:强制再 flush 一次
if (buffer.size.get() > 0) {
flushBuffer(aiMessageId, buffer);
}
}
七、问题:点击停止响应后,response_payload 偶发性丢失数据
7.1 现象
用户点击"停止响应"后,ai_message.response_payload 中偶尔缺少最开始的几个 SSE chunk。例如:
| 应该包含 | 实际缺失 |
|---|---|
{"event":"message","answer":"苏"} |
❌ 丢失 |
{"event":"message","answer":"醒"} |
❌ 丢失 |
{"event":"message","answer":"注意事项"} |
❌ 丢失 |
数据库对比(同一功能、同一患者):
| 消息 ID | response_payload 长度 | 数据完整性 |
|---|---|---|
| 776883610171085840 | 94,632 字节 | ✅ 完整,包含"苏醒注意事项清单"标题 |
| 776883610171085847 | 89,615 字节 | ❌ 缺失 ~5KB,丢失初始 answer 块 |
7.2 根因:ChunkBuffer.clear() 的竞态条件
问题出在 flushBuffer 方法中 drain 和 clear 之间的时间窗口。
正常流程 (无竞态)
flushBuffer 线程:
getAllChunks() → drain [A, B, C]
batchAppendWithRetry(A+B+C) ← DB 写入
buffer.clear() → 队列空,状态归零
竞态场景
时间 ───────────────────────────────────────────────────────────>
flushBuffer 线程:
getAllChunks() drain了大块数据[workflow_started(70KB) + node_started + node_finished]
│ 队列现在为空
│ └──── DB 写入中(70KB 耗时较长) ────┐
│ ↓
│ buffer.clear()
│ ConcurrentLinkedQueue.clear()
│ 清空了 [answer chunks]!❌
流处理线程:
新的 answer chunk 到达 → addChunk([answer:"苏"], [answer:"醒"], ...)
shouldFlush() → processing 已 = true → 跳过
tailEvent 检测 → processing 已 = true → 跳过
handleOptimizedPersistence:
lastPersistedRawLen += answerDelta.length() ← 计数器已递增!
竞态窗口:getAllChunks() drain 完后到 clear() 执行前,DB 写入是耗时操作(尤其第一块 workflow_started 约 70KB)。
在这个窗口内:
- 新的 answer chunk 到达 →
addChunk()入队成功 - 但
processing=true,不会触发新的 flush handleOptimizedPersistence立即更新了lastPersistedRawLen(计数器说"已持久化")- DB 写入完成后
clear()执行 →ConcurrentLinkedQueue.clear()把刚入队的 answer chunk 也清掉了
最终后果:
lastPersistedRawLen已包含 answer chunk 的长度 →handleStreamCompletion认为没有剩余数据- 缓冲区队列已被
clear()清空 → 数据实际未写入 DB - → answer chunk 永久丢失
为什么是第一个 answer 块最常丢失
因为第一个 handleOptimizedPersistence 的 delta 包含了巨大的 workflow_started 事件(~70KB 的患者信息 JSON),且其中含有 node_finished 字符串触发尾事件立即 flush。70KB 的 DB 写入耗时较长,正好给了后续 answer chunk 到达的窗口期。
八、修复
修改前
public void clear() {
chunks.clear(); // ← 清空所有,包括窗口期新入队的
chunksPlain.clear();
chunksRaw.clear();
size.set(0);
processing.set(false);
}
修改后
/**
* 结束本轮 flush,只重置 processing 标志并重新计算剩余 size。
*
* 不再 clear 队列:flushBuffer 已通过 getAllChunks* 将本轮数据 drain 走,
* 但 drain 与 clear 之间的窗口期内可能有新 chunk 入队。
* 若 clear 会把这些新 chunk 也清掉,导致数据丢失。
* 因此仅根据队列实际剩余量修正 size,保留新入队的数据供下一轮 flush。
*/
public void clear() {
int remaining = chunks.size();
size.set(remaining);
processing.set(false);
}
修复逻辑
- drain 阶段
getAllChunks()已取出本轮数据,队列中只剩窗口期新入队的数据 - 不调用
ConcurrentLinkedQueue.clear(),保留这些新数据 size设为队列实际剩余量,而非归零processing置为false,允许后续shouldFlush()或waitForBufferFlush兜底处理
修复后的时序
flushBuffer 线程:
getAllChunks() drain [workflow_started(70KB) + ...]
│ 队列现在为空
│ └──── DB 写入中 ────┐
│ ↓
│ buffer.clear()
│ remaining = chunks.size() = 1 (answer chunk)
│ size.set(1), processing = false
│ ✅ 保留了 answer chunk
流处理线程:
answer chunk 到达 → addChunk → size=1
后续:
handleStreamCompletion → forceFlush → processing.compareAndSet(false,true) = true
→ flushBuffer(answer chunk) → DB 写入
✅ answer chunk 正确落库
九、完整链路总结
┌──────────────┐ HTTP SSE ┌──────────────────────────────────────┐
│ Dify AI │ ────────────────→ │ streamToAiOptimized │
│ (云服务) │ chunk by chunk │ │
└──────────────┘ │ rawPayloadSb ←──────── chunk │
│ │ │
│ processChunk(chunk) │
│ │ │
│ rawSb ←── processedChunk │
│ │ │
│ cleanChunk(processedChunk) │
│ │ │
│ finalSb ←── answer │
│ │ │
│ ┌────┴────────────────────┐ │
│ │ handleOptimizedPersistence│ │
│ │ 增量 = sb[计数器:] │ │
│ │ appendStreamingChunk │ │
│ │ Optimized(delta) │ │
│ │ 计数器 += delta.length │ │
│ └──────────┬──────────────┘ │
└─────────────┼────────────────────────┘
│
┌────────────▼──────────────────────┐
│ ChunkBuffer │
│ ConcurrentLinkedQueue<chunks> │
│ - shouldFlush? (≥20/500ms/tail) │
│ - addChunk → 入队 │
│ - getAllChunks → drain → SQL │
│ - clear → 只重置processing+size │
└────────────┬──────────────────────┘
│
┌────────────▼──────────────────────┐
│ MySQL │
│ UPDATE ai_message │
│ SET content = CONCAT(...) │
│ response_payload = CONCAT(...)│
└───────────────────────────────────┘
三条轨道最终去向:
| 轨道 | 写入方式 | DB字段 | 最终处理 |
|---|---|---|---|
finalSb |
appendStreamingChunkOptimized + completeStreamingSession 覆盖 |
content |
流结束时用完整答案文本覆盖 |
rawPayloadSb |
appendStreamingChunkOptimized(增量 CONCAT) |
response_payload |
全量 SSE 原始数据,不断 CONCAT 追加 |
content_plain |
无(已移除默认赋值) | content_plain |
仅由 ContentPlainHelper.parseContentPlain 清洗后赋值 |
十、教训与经验
- 异步缓冲区的 drain-and-clear 模式天然存在竞态窗口。drain 和 clear 之间的任何操作(DB 写入、网络 IO)时间越长,窗口越大,丢数据风险越高。
- 计数器不应在异步操作完成前更新,否则会丢失"数据实际未落库但计数器已前进"的追踪能力。但这里受限于架构(需要在下一个 chunk 到达前阻止重复计算 delta),所以采用了折中方案。
ConcurrentLinkedQueue.clear()是破坏性操作——它会清空 drain 之后新入队的所有元素。在多线程生产者-消费者场景中,不应假设 drain 后到 clear 前没有新元素入队。- 修复后
clear()的语义变更为"重置处理状态"而非"清空缓冲区",这更符合实际需求:drain 已经取走了需要处理的数据,剩下的只是窗口期新来的,应该保留而非丢弃。
相关文件:
AiPayloadHelperOptimized.java— 流处理主循环 + 持久化调度AiMessageServiceImplEnhanced.java— ChunkBuffer 定义 + flush 逻辑AiMessageMapperOptimized.java—batchAppendContentSeparateSQLAiChunkUtils.java— chunk 清洗工具