引言
最近实现了一套实时录音录入的系统,通过“三段式”的系统设计,最终成功交付,并抗住了线上高峰,记录一下设计思路。
三段式:定边界、划层级、找流向
需求
简述下需求:业务上需要实现一套实时语言转录的系统,通过大模型进行语音转文字,并且大模型那边对音频流的要求比较严格:
- 以音频流的形式发送给大模型端进行处理
- 音频流需要严格保证传递顺序
- 每一帧长度固定为 3.2 kb 或者 6.4 kb 左右
- 每一帧时间间隔不能太长,也不能太短
经过技术评审,后端希望能够只做简单转发,核心逻辑放在前端处理。
整体设计
定边界
第一步,定边界,哪些是不变量,哪些是固定规则,系统能力边、性能、稳定性等方面的界限,需要做到什么程度。
业务边界
- 运行环境是小程序。
- 录音时长当前最长 3 分钟,超时自动结束,未来最多放开到 10 分钟。
- 稳定性是第一位,需要保证零丢失、高可靠,良好的用户体验。
稳定性边界
对于这套系统,稳定性是关键,尤其是运行在小程序环境中,和 Web 不同,小程序在置于后台一段时间后,可能会自动休眠,从而导致长连接断开,因此需要考虑重连重传等场景。
- 连接稳定性
- 断线能重连
- 失败能重传
性能边界
- 高频音频流发送
- 需要考虑音频流堆积导致的内存问题
- 使用规模:千人级别
总结
这个需求可以简单看做是一个一对一通讯模型,需要保持长连接,并且大模型端需要返回转译的文本,也就是说需要实现全双工通讯,因此很明显,这需要使用 WebSocket 通讯。SSE 也是长连接,但是其是单向通讯,而 HTTP 扛不住高频数据流传递的性能问题。
另一方面,需要一个明确的机制来处理音频流,通过 WebSocket 使其流向后端,后端转发给大模型,大模型解析后将结果返回回后端,后端再返回给前端。
整体上还需要考虑稳定性和性能,稳定性上需要考虑断线、失败的场景,性能上需要考虑使用用户规模,内存管理以及音频流流速问题等。
划层级
基于前面的总结,大体上可以划分为三层:
- Voice 层:负责管理录音音频,统一对外输出接口,使其能稳定的流向 WS 层
- WS 层:负责实现 WebSocket 通讯,并保证连接稳定性
- 通讯层:统一前后端通讯协议(JSON/string/protoBuf/其他)和通讯方式
Voice 层和 WS 层存在直接的数据流流向,Voice 生产数据,而 WS 消费数据,这本质上是一个生产消费模型。
通讯层
通讯层决定前后端以什么方式实现通讯,它决定后面 Voice 层和 WS 层如何设计实现,因此放到最开始。
我最初设计了两套方案:
- 请求应答方案:发送一帧后等待回复,收到回复后再发送下一帧,如此重复。
- 滑动窗口方案:例如窗口大小是 N,那么一次性发送 N 帧,后端返回收到的累积帧 id,然后从该 id 起的后 N 个帧,如此重复。
两者的速率都可以被调节。
- 请求应答方案:如果要快一点就增大帧中音频块大小,并携带音频块数量信息,后端根据信息切割后发送给大模型。
- 滑动窗口方案:如果慢了后端直接在响应中更新窗口大小,让前端多发送几帧,类似 TCP 机制。
整体来说,滑动窗口方案更好,在网络波动的情况下,性能表现会更好一些,然而实际开发中我采用了第一种,也就是请求应答的方案。
主要原因是:
- 稳定性优先
- 滑动窗口方案实现复杂度更高一些
- 用户规模不多,录音时长只有 3 分钟,未来最多 10 分钟
- 现代设备通常网络情况较好,第一套方案在绝大多数情况下够用
这里就体现先定边界的重要性了,系统实现的思路和方案有很多,但是不应该盲目采用那些看起来大而全的方案,而是应该根据实际业务需求和系统边界来判断哪套方案最符合系统需要。
用一张图方便理解请求应答方案:
Voice 层
RecordVoice 封装
Voice 层首先需要实现小程序环境下对录音的一个封装,还需要考虑到小程序环境下特定所需的授权机制。
封装主要是为了:
- 统一事件处理
- 简化小程序授权操作
- 简化音频流采集配置
BufferQueue 缓冲队列
Voice 作为生产者,它生产的数据流向 WS 这个消费者,但是这会产生一个问题:如果生产速度大于消费速度,那么就会导致数据无法被及时消费,因此需要提供一个缓冲区来暂存这些数据,队列的“先进先出”的特性非常符合这个要求,我们称这个结构为缓冲队列。
这里需要重点关注缓冲队列的大小,如果生产速度持续大于消费速度,缓冲队列可能会持续无限制的增大,造成内存溢出,因此需要设置合理的背压机制。
为此,我设计了两套防线:
- 如果缓冲队列长度超过阈值,则增大 WS 层每次发送帧中音频块的大小,并携带块数量等信息,后端根据信息进行分块,再传递给大模型端;当缓冲队列小于阈值后,恢复。
- 如果还是无法缓解,达到队列最大长度限制,则直接丢弃最旧的音频数据,保留最新的。
WS 层
WS
小程序环境下 WS 的封装,核心需要关注:
- WS 基本实现:包括连接、事件等封装
- 心跳机制:保证连接存活,此外还可以用于检测连接是否断开
- 断线重连:注意重连设置阈值,避免无限重连
- 失败重传:在请求超时的情况下,重传数据帧,但是需要注意携带 id 来去重
找流向
数据流
前面已经分析过,WS 和 Voice 本质上是一个生产消费模式,Voice 产生音频流,提供给 WS 进行消费,因此系统中存在的数据流有:
- Voice 音频流 → WS
- WS → 服务器
同时,大模型端的文本也会返回给前端:
- 大模型端 → 服务器
- 服务器 → WS
这里重点关注 Voice 流行 WS 的音频流,我们可以实现一个管道机制,来让 Voice 中缓冲队列的音频数据自然地流向 WS,并自动实现请求-应答机制。
这个管道机制的核心要求是:当发送完一帧时,就阻塞,等待响应,收到响应后再发送下一帧。很容易想到可以通过状态 + 事件订阅的方式来实现。
Promise 天然符合这个要求,本身 then 提供事件订阅机制,promise 自身还提供 pending、fulfilled 和 rejected 三种状态,分别可以表示:
- pending: 发送后阻塞阶段
- fulfilled:收到响应,即将发送下一阵
- rejected:失败,需要重发
给一个代码示例:
生产者的管道函数,通过自动机不断传递消息帧:
/**
* 流式管道(生产者)
* 这是一个自动机,它会持续从缓冲区中取出数据,直到缓冲区为空且停止记录。
* @param callback 消费函数
* @returns
*/
public async pipe(
callback: (item: FrameVoiceBuff, size: number) => Promise<any>
): Promise<void> {
if (this.piping) {
console.warn("pipe is piping, please wait");
return Promise.resolve();
}
this.piping = true;
let pendingCount = 0
let item: FrameVoiceBuff | null = null;
do {
item = await this.shiftBuff();
if (item) {
pendingCount = 0
// 业务层处理异常
await callback(item, this.buffer.length);
if(item.isLastFrame){
break
}
} else if(pendingCount > 5) {
break
} else {
pendingCount++
await new Promise<void>((resolve)=>{
setTimeout(()=>resolve(), 100)
})
}
} while (true);
this.piping = false;
}
private shiftBuff(): Promise<FrameVoiceBuff | null> {
return new Promise((resolve) => {
if (this.state === "stopped" && this.buffer.length === 0) {
return resolve(null);
} else if (this.buffer.length > 0) {
return resolve(this.buffer.shift()!);
}
const timer = setTimeout(() => {
this.offFrame(this.waitFrameSymbol);
resolve(null);
}, this.Max_TIME_OUT);
this.onFrame(this.waitFrameSymbol, () => {
clearTimeout(timer);
this.offFrame(this.waitFrameSymbol);
// 确保pushBufferSymbol事件在waitFrameSymbol事件之前执行
setTimeout(() => resolve(this.buffer.shift()!), 0);
});
});
}消费者的管道函数, 通过 promise 实现:
/**
* 串行流(请求-响应模式)
* 即当发送一个请求后,等待服务器返回响应,只有当服务器返回成功响应后,才会发送下一个请求。
* @returns 包含pipe和clear方法的对象
*/
sendSerialStream() {
let { resolve: pResolve, reject: pReject, promise } = withResolvers<void>();
const handleSuccess = (msg: WSMessage) => {
console.log("[ws pipe]: 接收消息帧成功", msg);
pResolve();
};
this.onMessage(WsClient.WsEventCode.OK, handleSuccess);
const handleError = (msg: WSMessage) => {
console.error("[ws pipe]: 接收消息帧失败", msg);
pReject(msg);
};
this.onMessage(WsClient.WsEventCode.FAIL, handleError);
return {
clear: () => {
this.offMessage(WsClient.WsEventCode.OK, handleSuccess);
this.offMessage(WsClient.WsEventCode.FAIL, handleError);
},
pipe: <T>(e: T) => {
return new Promise<void>((resolve, reject) => {
let timer = setTimeout(
() =>
reject({
err: new Error("发送超时"),
code: WsClient.WsEventCode.TIMEOUT,
reason: "发送超时",
item: e,
}),
this.maxTimeout
);
return this.send<T>(e)
.then(() => promise)
.then(() => {
clearTimeout(timer);
const p = withResolvers<void>();
pResolve = p.resolve;
pReject = p.reject;
promise = p.promise;
resolve();
return { code: WsClient.WsEventCode.OK };
})
.catch((err) =>
reject({
err: err,
code: WsClient.WsEventCode.SEND_ERROR,
reason:
err?.reason?.errMsg ||
err?.reason?.message ||
err?.reason ||
err,
item: e,
})
);
});
},
};
}使用方式非常简单,类似 Nodejs 中的流式 API:
const { pipe, clear } = wsClient.sendSerialStream();
let piping: () => Promise<void> = () =>
frameVoice.pipe((e: FrameVoiceBuff, size: number) => {
console.log("frameVoice.pipe", e, size);
if (e.isLastFrame || (frameVoice.state === 'stopped' && size === 0)) {
// 最后一帧需要单独处理
const data = getLastFrame(e)
return pipe<ArrayBuffer>(data.buffer);
}
return pipe(e.frameBuffer);
}).then(() => {
// 结束
// ...
clear()
}, ()=>{
// 异常处理
})画一张图方便理解:
stateDiagram-v2 direction LR [*] --> Idle: pipe() 启动 Idle --> TakeFrame: 队列有数据 Idle --> WaitFrame: 队列为空 && 仍在录音 Idle --> Done: stopped && 队列为空 WaitFrame --> TakeFrame: onFrame() 唤醒 / 新帧入队 WaitFrame --> Done: 超时 / stopped && 队列仍为空 TakeFrame --> Sending: shiftBuff() 出队一帧 Sending --> WaitingAck: WS.send(frame) Sending --> Retry: send error / 立即失败 Sending --> Failed: 超过最大重试 / fatal WaitingAck --> Acked: 收到 OK(ACK) WaitingAck --> Retry: TIMEOUT / FAIL Acked --> Done: isLastFrame Acked --> Idle: 继续下一帧 Retry --> Sending: 重传(同 frameId, connectId) Retry --> Failed: 超过最大重试 / 连接不可用 Failed --> [*] Done --> [*]
控制流
在缓冲队列中,背压机制会控制 WS 层重发送的音频流的大小,因此存在以下控制流:
- Voice → WS (比如 Voice 通知 WS 快一点)
如果是滑动窗口方案,那么就会存在:
- 服务器 → WS
- WS → 服务器
请求应答这套机制则完全是通过缓冲队列大小来动态单向控制 WS 层发送速度。
失败路径
前面已经提到过:如果断线就得重连,如果失败就得重传。
除了 WS 本身的连接状态外,还额外实现了心跳机制来确保小程序休眠状态下 WS 连接的存活,并以此判断连接是否存活。
心跳机制:简单来说是前端定时不间断地发送一个空包给服务器,服务器收到后返回一个确认空包给前端,这样两者就都能确定连接的状态,如果超时未收到确认则重新发送空包,如果多次重试未收到回复,那么说明连接断开。
实现重连的基础是实现会话 ID,这要求在连接建立时就确定会话信息,实现起来也很简单:
{
"type": "connect_start",
"connect_id": "前端实现一个uuid"
}在连接建立时,前端发送一个 UUID 作为会话 id,之后每次发送消息帧都携带这个会话 id 和帧 Id。
当连接断开时,前端重新连接并用这个会话 Id 来恢复之前的会话,并重新发送之前的消息帧。
总结
这套实时语音转录系统,本质上并不复杂,但真正的难点在于:如何在强约束条件下保证稳定性和可靠性。
通过“三段式”的设计方式——定边界、划层级、找流向——我们将问题逐渐收口:
- 通过定边界,明确运行环境、性能极限和稳定性要求,让系统复杂度可控;
- 通过划层级,拆分 Voice、WS 与通讯协议层,践行单一职责原则,降低耦合;
- 通过找流向,梳理数据流、控制流与失败路径,确保系统逻辑闭环。
在实现层面,这个系统本质上是一个:
- 有序流式传输模型
- 带背压机制的生产-消费系统
- 支持会话恢复的可靠传输机制
此外,这套系统的成功运行和交付也证明了三段式方法论的可行性:
通过定边界确定系统业务、性能等边界,收敛复杂度,然后用划层级降低系统耦合度,最后通过找流向梳理系统中的数据流、控制流、状态流和失败路径,来实现系统逻辑闭环。