优化 ModelCaller 的日志记录和流式处理逻辑,增强 SSE 解析能力

This commit is contained in:
2026-01-18 12:38:37 +08:00
committed by Silence_Lurker
parent 80348ded2c
commit 1914884a8b

View File

@@ -37,7 +37,8 @@ export default class ModelCaller {
} }
// 2. 逻辑中直接使用 options 属性 // 2. 逻辑中直接使用 options 属性
this._log('info', `API Request [${options.mode}] Stream: ${options.fakeStream}`, callerName); // 记录一下当前的流模式,方便调试
this._log('info', `API Request [${options.mode}] StreamMode: ${options.fakeStream}`, callerName);
try { try {
// 统一构建请求体 DTO // 统一构建请求体 DTO
@@ -50,7 +51,9 @@ export default class ModelCaller {
result = await this._callDirect(callerName, requestBody, options); result = await this._callDirect(callerName, requestBody, options);
} }
return this._normalize(result); // 如果是流式返回result 已经是拼接好的字符串,不需要 normalize 的部分逻辑
// 但为了统一,我们还是传进去检查一下
return this._normalize(result, options.fakeStream);
} catch (error) { } catch (error) {
this._log('error', `Request Failed: ${error.message}`, callerName); this._log('error', `Request Failed: ${error.message}`, callerName);
throw error; throw error;
@@ -59,7 +62,7 @@ export default class ModelCaller {
// 内部日志封装 // 内部日志封装
_log(level, msg, plugin) { _log(level, msg, plugin) {
if (this.logger && typeof this.logger.log === 'function') { if (this.logger?.log) {
this.logger.log(level, msg, 'ModelCaller', plugin); this.logger.log(level, msg, 'ModelCaller', plugin);
} }
} }
@@ -151,56 +154,84 @@ export default class ModelCaller {
// 网络层核心 // 网络层核心
// ======================================================================== // ========================================================================
async _fetchStandard(url, options) { async _fetchStandard(url, opts) {
const response = await fetch(url, options); const res = await fetch(url, opts);
if (!response.ok) { if (!res.ok) throw new Error(`HTTP ${res.status}`);
// const text = await response.text(); return await res.json();
throw new Error(`HTTP ${response.status}`);
}
return await response.json();
} }
// 伪流式聚合防 CloudFlare 超时 // 【核心升级】:支持 SSE 解析的伪流式聚合防 CloudFlare 超时
async _fetchFakeStream(url, options) { async _fetchFakeStream(url, opts) {
const response = await fetch(url, options); const res = await fetch(url, opts);
if (!res.ok) throw new Error(`Stream HTTP ${res.status}`);
if (!response.ok) { const reader = res.body.getReader();
throw new Error(`Stream HTTP ${response.status}`); const decoder = new TextDecoder();
} let fullContent = ""; // 用于存储最终拼接的纯文本
let buffer = ""; // 用于存储未处理完的数据片段
if (!response.body) {
return await response.json();
}
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
let aggregated = "";
try { try {
while (true) { while (true) {
const { done, value } = await reader.read(); const { done, value } = await reader.read();
if (done) break; if (done) break;
// 持续读取保持连接活跃 // 1. 解码当前数据包
aggregated += decoder.decode(value, { stream: true }); const chunk = decoder.decode(value, { stream: true });
} buffer += chunk;
aggregated += decoder.decode();
// 2. 处理 SSE 格式 (data: {...})
// 以双换行符分割每一条 SSE 消息
const lines = buffer.split('\n');
// 保留最后一个可能不完整的片段在 buffer 中
buffer = lines.pop();
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed === 'data: [DONE]') continue;
if (trimmed.startsWith('data: ')) {
try { try {
return JSON.parse(aggregated); const jsonStr = trimmed.substring(6); // 去掉 'data: '
const json = JSON.parse(jsonStr);
// 提取 delta content
const delta = json.choices?.[0]?.delta?.content;
if (delta) {
fullContent += delta;
}
} catch (e) { } catch (e) {
// 如果是 SSE 格式或其他非 JSON 格式,暂且返回文本 // 忽略解析错误的行,防止因为个别丢包导致整个请求失败
return aggregated; console.warn('[ModelCaller] SSE Parse Error:', e);
}
}
}
} }
} finally { } finally {
reader.releaseLock(); reader.releaseLock();
} }
// 如果 fullContent 是空的,说明可能服务端根本没返回 SSE 格式,而是直接返回了纯文本或 JSON
// 这种情况下尝试降级处理
if (!fullContent && buffer) {
try {
const json = JSON.parse(buffer);
return json; // 是标准 JSON
} catch {
return buffer; // 是纯文本
}
}
return fullContent;
} }
// ======================================================================== // ========================================================================
// 数据归一化 // 数据归一化
// ======================================================================== // ========================================================================
_normalize(data) { _normalize(data, isFromStream = false) {
// 如果是从流式聚合来的,它已经是一个纯字符串了,直接返回
if (isFromStream && typeof data === 'string') {
return data;
}
// 如果是 JSON 字符串则解析 // 如果是 JSON 字符串则解析
if (typeof data === 'string') { if (typeof data === 'string') {
try { data = JSON.parse(data); } catch (e) { return data; } try { data = JSON.parse(data); } catch (e) { return data; }