From a5f3c92fa3ace403900218e04ac31b34410f6054 Mon Sep 17 00:00:00 2001 From: SilenceLurker Date: Sun, 18 Jan 2026 12:38:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=20ModelCaller=20=E7=9A=84?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E8=AE=B0=E5=BD=95=E5=92=8C=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C=E5=A2=9E=E5=BC=BA?= =?UTF-8?q?=20SSE=20=E8=A7=A3=E6=9E=90=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- SL/bus/api/ModelCaller.js | 103 +++++++++++++++++++++++++------------- 1 file changed, 67 insertions(+), 36 deletions(-) diff --git a/SL/bus/api/ModelCaller.js b/SL/bus/api/ModelCaller.js index 7afc43a..5861a3f 100644 --- a/SL/bus/api/ModelCaller.js +++ b/SL/bus/api/ModelCaller.js @@ -37,7 +37,8 @@ export default class ModelCaller { } // 2. 逻辑中直接使用 options 属性 - this._log('info', `API Request [${options.mode}] Stream: ${options.fakeStream}`, callerName); + // 记录一下当前的流模式,方便调试 + this._log('info', `API Request [${options.mode}] StreamMode: ${options.fakeStream}`, callerName); try { // 统一构建请求体 DTO @@ -50,7 +51,9 @@ export default class ModelCaller { result = await this._callDirect(callerName, requestBody, options); } - return this._normalize(result); + // 如果是流式返回,result 已经是拼接好的字符串,不需要 normalize 的部分逻辑 + // 但为了统一,我们还是传进去检查一下 + return this._normalize(result, options.fakeStream); } catch (error) { this._log('error', `Request Failed: ${error.message}`, callerName); throw error; @@ -59,7 +62,7 @@ export default class ModelCaller { // 内部日志封装 _log(level, msg, plugin) { - if (this.logger && typeof this.logger.log === 'function') { + if (this.logger?.log) { this.logger.log(level, msg, 'ModelCaller', plugin); } } @@ -151,56 +154,84 @@ export default class ModelCaller { // 网络层核心 // ======================================================================== - async _fetchStandard(url, options) { - const response = await fetch(url, options); - if (!response.ok) { - // const text = await response.text(); - throw new Error(`HTTP ${response.status}`); - } - return await response.json(); + async _fetchStandard(url, opts) { + const res = await fetch(url, opts); + if (!res.ok) throw new Error(`HTTP ${res.status}`); + return await res.json(); } - // 伪流式聚合:防 CloudFlare 超时 - async _fetchFakeStream(url, options) { - const response = await fetch(url, options); - - if (!response.ok) { - throw new Error(`Stream HTTP ${response.status}`); - } - - if (!response.body) { - return await response.json(); - } - - const reader = response.body.getReader(); - const decoder = new TextDecoder("utf-8"); - let aggregated = ""; - + // 【核心升级】:支持 SSE 解析的伪流式聚合,防 CloudFlare 超时 + async _fetchFakeStream(url, opts) { + const res = await fetch(url, opts); + if (!res.ok) throw new Error(`Stream HTTP ${res.status}`); + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let fullContent = ""; // 用于存储最终拼接的纯文本 + let buffer = ""; // 用于存储未处理完的数据片段 try { while (true) { const { done, value } = await reader.read(); if (done) break; - // 持续读取保持连接活跃 - aggregated += decoder.decode(value, { stream: true }); - } - aggregated += decoder.decode(); + // 1. 解码当前数据包 + const chunk = decoder.decode(value, { stream: true }); + buffer += chunk; - try { - return JSON.parse(aggregated); - } catch (e) { - // 如果是 SSE 格式或其他非 JSON 格式,暂且返回文本 - return aggregated; + // 2. 处理 SSE 格式 (data: {...}) + // 以双换行符分割每一条 SSE 消息 + const lines = buffer.split('\n'); + + // 保留最后一个可能不完整的片段在 buffer 中 + buffer = lines.pop(); + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed || trimmed === 'data: [DONE]') continue; + + if (trimmed.startsWith('data: ')) { + try { + const jsonStr = trimmed.substring(6); // 去掉 'data: ' + const json = JSON.parse(jsonStr); + + // 提取 delta content + const delta = json.choices?.[0]?.delta?.content; + if (delta) { + fullContent += delta; + } + } catch (e) { + // 忽略解析错误的行,防止因为个别丢包导致整个请求失败 + console.warn('[ModelCaller] SSE Parse Error:', e); + } + } + } } } finally { reader.releaseLock(); } + + // 如果 fullContent 是空的,说明可能服务端根本没返回 SSE 格式,而是直接返回了纯文本或 JSON + // 这种情况下尝试降级处理 + if (!fullContent && buffer) { + try { + const json = JSON.parse(buffer); + return json; // 是标准 JSON + } catch { + return buffer; // 是纯文本 + } + } + + return fullContent; } // ======================================================================== // 数据归一化 // ======================================================================== - _normalize(data) { + _normalize(data, isFromStream = false) { + // 如果是从流式聚合来的,它已经是一个纯字符串了,直接返回 + if (isFromStream && typeof data === 'string') { + return data; + } + // 如果是 JSON 字符串则解析 if (typeof data === 'string') { try { data = JSON.parse(data); } catch (e) { return data; }