优化 ModelCaller 的日志记录和流式处理逻辑,增强 SSE 解析能力

This commit is contained in:
2026-01-18 12:38:37 +08:00
parent 3d16bb20bb
commit a5f3c92fa3

View File

@@ -37,7 +37,8 @@ export default class ModelCaller {
}
// 2. 逻辑中直接使用 options 属性
this._log('info', `API Request [${options.mode}] Stream: ${options.fakeStream}`, callerName);
// 记录一下当前的流模式,方便调试
this._log('info', `API Request [${options.mode}] StreamMode: ${options.fakeStream}`, callerName);
try {
// 统一构建请求体 DTO
@@ -50,7 +51,9 @@ export default class ModelCaller {
result = await this._callDirect(callerName, requestBody, options);
}
return this._normalize(result);
// 如果是流式返回result 已经是拼接好的字符串,不需要 normalize 的部分逻辑
// 但为了统一,我们还是传进去检查一下
return this._normalize(result, options.fakeStream);
} catch (error) {
this._log('error', `Request Failed: ${error.message}`, callerName);
throw error;
@@ -59,7 +62,7 @@ export default class ModelCaller {
// 内部日志封装
_log(level, msg, plugin) {
if (this.logger && typeof this.logger.log === 'function') {
if (this.logger?.log) {
this.logger.log(level, msg, 'ModelCaller', plugin);
}
}
@@ -151,56 +154,84 @@ export default class ModelCaller {
// 网络层核心
// ========================================================================
async _fetchStandard(url, options) {
const response = await fetch(url, options);
if (!response.ok) {
// const text = await response.text();
throw new Error(`HTTP ${response.status}`);
}
return await response.json();
async _fetchStandard(url, opts) {
const res = await fetch(url, opts);
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return await res.json();
}
// 伪流式聚合防 CloudFlare 超时
async _fetchFakeStream(url, options) {
const response = await fetch(url, options);
if (!response.ok) {
throw new Error(`Stream HTTP ${response.status}`);
}
if (!response.body) {
return await response.json();
}
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
let aggregated = "";
// 【核心升级】:支持 SSE 解析的伪流式聚合防 CloudFlare 超时
async _fetchFakeStream(url, opts) {
const res = await fetch(url, opts);
if (!res.ok) throw new Error(`Stream HTTP ${res.status}`);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let fullContent = ""; // 用于存储最终拼接的纯文本
let buffer = ""; // 用于存储未处理完的数据片段
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
// 持续读取保持连接活跃
aggregated += decoder.decode(value, { stream: true });
}
aggregated += decoder.decode();
// 1. 解码当前数据包
const chunk = decoder.decode(value, { stream: true });
buffer += chunk;
try {
return JSON.parse(aggregated);
} catch (e) {
// 如果是 SSE 格式或其他非 JSON 格式,暂且返回文本
return aggregated;
// 2. 处理 SSE 格式 (data: {...})
// 以双换行符分割每一条 SSE 消息
const lines = buffer.split('\n');
// 保留最后一个可能不完整的片段在 buffer 中
buffer = lines.pop();
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed === 'data: [DONE]') continue;
if (trimmed.startsWith('data: ')) {
try {
const jsonStr = trimmed.substring(6); // 去掉 'data: '
const json = JSON.parse(jsonStr);
// 提取 delta content
const delta = json.choices?.[0]?.delta?.content;
if (delta) {
fullContent += delta;
}
} catch (e) {
// 忽略解析错误的行,防止因为个别丢包导致整个请求失败
console.warn('[ModelCaller] SSE Parse Error:', e);
}
}
}
}
} finally {
reader.releaseLock();
}
// 如果 fullContent 是空的,说明可能服务端根本没返回 SSE 格式,而是直接返回了纯文本或 JSON
// 这种情况下尝试降级处理
if (!fullContent && buffer) {
try {
const json = JSON.parse(buffer);
return json; // 是标准 JSON
} catch {
return buffer; // 是纯文本
}
}
return fullContent;
}
// ========================================================================
// 数据归一化
// ========================================================================
_normalize(data) {
_normalize(data, isFromStream = false) {
// 如果是从流式聚合来的,它已经是一个纯字符串了,直接返回
if (isFromStream && typeof data === 'string') {
return data;
}
// 如果是 JSON 字符串则解析
if (typeof data === 'string') {
try { data = JSON.parse(data); } catch (e) { return data; }