标签: stream

  • 使用 Vercel Edge Function 访问 OpenAI API 的注意事项

    使用 Vercel Edge Function 访问 OpenAI API 的注意事项

    从某天开始,OpenAI API 无法从国内直接访问。而且,也不是每个人都有自己的云服务器,能够搭建独立服务。那么,还有别的办法能比较容易的访问到 OpenAI 么?当然是有的,比如 Vercel Edge Function,或者 CloudFlare Edge Function。

    这里我准备结合前阵子的开发经验,分享一下使用 Vercel Edge Function 访问 OpenAI API 的注意事项,让新来的开发者能少走弯路。

    推荐阅读

    开始之前,我建议大家先花点时间了解一下 Edge Function,以及如何使用 Vercel Edge Function 开发 OpenAI 应用。因为我后面要分享的主要是踩过的坑,所以先系统性了解会好很多:

    Building a GPT-3 app with Next.js and Vercel Edge Functions

    官方教程,还有 Demo 网站GitHub 项目,非常友好。虽然是英文写的,不过并不难懂,实在不行就用 Edge 浏览器自带的翻译功能吧,建议大家好好学习英文。

    自有域名+CNAME 实现国内访问

    Vercel 给免费版用户也提供子域名+ SSL 证书,很多时候都够用,但可惜,vercel.app 大域名被墙了,连带所有子域名都无法访问。好在 Vercel CDN 在国内也还能用,所以我们只需要一个自己的域名即可。

    申请域名的选择有很多,国内几大云服务商都能注册,国外的域名供应商也可以放心使用。我比较常用的是 namecheap.com。便宜的比如 .xyz 域名首年只要几块钱,随便注册一个就能用。

    注册完域名之后,在 Vercel 后台找到自己的应用,在“Setting > Domains“里添加域名,然后 Vercel 会告诉你怎么配置 DNS。复制解析目标,在域名供应商 DNS 配置页面完成 CNAME 配置,稍等片刻,解析生效后,即可得到一个国内也能正常访问的域名。

    使用 Edge Function

    Vercel Edge Function 与我们日常开发的 node.js 服务器略有区别。它并非完整的 node.js,而是 Edge 基于 V8 专门打造的袖珍运行时,尽可能轻量化,裁剪掉很多系统 API。功能少,但是速度很快,几乎零启动时间。(我之前将它跟 Supabase 记混了,以为它也是基于 deno 的。)

    使用 Edge Function 的好处,简单来说:省运维;详细来说,大概有这么几点:

    1. 性能更好。比随便买个小水管强得多。
    2. 自带弹性伸缩。不管访问量怎么成长,都有 Vercel 集群帮我们自动伸缩。(当然可能需要付钱)
    3. 启动速度比 serverless 快很多,基本没有等待时间。
    4. 免费额度足够初期 MVP 验证。

    坏处当然也有。首先,Edge Function 里跑的是 TS,这就意味着很多兼容 JS 的开源仓库都不能用。其次,Edge Function 很多原生 API 都不支持,所以没有特意兼容的仓库也不能用。举个例子,要完成网络请求,大家最熟悉的 Axios 就不能用,只能用系统原生的 fetch

    解决超时问题

    由于算法原因,OpenAI API 返回数据的总时间可能比较长,而 Edge Function 的等待时间又限制得很严。所以如果等待 OpenAI 返回全部数据再渲染,可能因为等太久,在 Edge Function 这里会超时。

    解决方案就是使用流(stream)式传播。在这种情况下,OpenAI 会逐步返回结果(差不多一个单词一个单词这样蹦),只要在客户端进行组合,就能看到类似实时输入的效果。

    完整的范例代码上面的官方文章有,我就不复制粘贴了,大家注意就好。

    Edge Function 的流不是最初的流

    这里有个坑,虽然我们在 Edge Function 里获取了 OpenAI API 的流,然后转发出来,但实际上我们接收到的流并不是最初的流。最初的流里,每次发送的数据都是完整的 JSON 文件,可以直接解析;但是 Edge Function 里转发给我们的却是前后合并后随机切分的结果。

    于是我们必须重新整理响应体。大概方案如下:

    1. 在每次返回的响应体里找到两个 json 的连接处
    2. 截断,拿到前面的 json,解析,得到自己想要的数据
    3. 继续查找完整的 json,如果没有,则和下一次响应体连接起来处理

    核心代码如下:

    class fetchGpt {
      fetch() {
        // 前面的代码参考官方例子
        // 我从循环读取开始
        while (!done) {
          const { value, done: doneReading } = await reader.read();
          done = doneReading;
          if (!value) {
            break;
          }
    
          // readableStream 读出来的是 Uint8Array,不能直接合并
          if (chunkValue.length > offset) {
            lastValue = concatUint8Array(lastValue, value);
          } else {
            lastValue = value;
            offset = 0;
          }
          chunkValue = decoder.decode(lastValue);
          [finishReason, offset] = this.parseChunk(chunkValue, offset);
        }
      }
      parseChunk(chunk: string, offset = 0): [string, number] {
        let finishReason = '';
        let nextOffset = offset;
        while (nextOffset !== -1) {
          nextOffset = chunk.indexOf(splitter, nextOffset + 1);
          const text = nextOffset !== -1
            ? chunk.substring(offset, nextOffset)
            : chunk.substring(offset);
          try {
            const json = JSON.parse(text);
            const [ choice ] = json.choices;
            const { delta, finish_reason } = choice;
            const chunkContent = delta?.content || '';
            // 这里我把数据交给事件 和 pinia 处理
            this.emit(MessengerEvent.MESSAGE, chunkContent, json.id, json.created);
            this.store.appendTextToLastItem(chunkContent, {
              id: json.id,
              created: json.created,
              system: this.options.system || '',
            });
            finishReason = finish_reason;
            offset = nextOffset !== -1 ? nextOffset : chunk.length;
          } catch (e) {
            //- ignore
          }
        }
        return [finishReason, offset];
      }

    常见 API 使用错误

    大家都知道,OpenAI 按照请求响应的 token 数算钱。所以我就想精打细算,通过在请求参数里减少 max_tokens,尽量少返回些内容。

    事实证明这个做法不成立。首先,OpenAI 对请求的兼容性不高,max_tokens 如果是 NaN 或者带有小数,都会报错。其次,ChatGPT 很啰嗦,内容量少了,它不过瘾,就会反复要求继续(finish_reason: 'length'),尤其在极端条件下,如果我们自动 continue,可能会误入死循环。

    所以我建议,max_tokens 最少 128,尽量 256 以上。


    前几天,有朋友在我的博客下面留言,说 Whisper 他试过,没啥特别的。我的观点不是这样。所谓纸上得来终觉浅,绝知此事要躬行。很多东西,确实不复杂,照着官方教程弄,三下五除二,在本地跑起来,并不难。但是想弄得干净利索,在生产环境里跑顺,遇到什么问题都能快速解决,也不是随随便便就能搞定的。

    希望有类似需求,寻求类似解决方案的同学能少走弯路;也欢迎大家多多分享,有意见建议,欢迎留言讨论。

  • 用 express.js 实现流式输出 HTTP 响应

    用 express.js 实现流式输出 HTTP 响应

    0. 前言

    我们先来总结一下客户端与服务器端的数据交互方式:

    1. ajax,即 XMLHttpRequest,最常见的数据交互方式,适用于较轻量级的一次性请求,比如用户登录、CRUD 等。
    2. SSE,服务器端事件,适用于有大量服务器端推送、且不需要从客户端发送数据的需求,比如盯股票行情
    3. WebSocket,适用于上下行数据都很多、都很频繁的情况,比如在线聊天、网络游戏等

    单从技术角度来看,大概是这几个。根据需求变化,我也可以组合出更多方案。比如,类似 Showman 生成视频这种需求,耗时很长,对服务器资源占用很高,就比较适合:

    1. 发起请求,产生一条任务记录,进入队列
    2. 服务器上运行独立的进程,从队列中取任务出来执行,并记录日志
    3. 客户端不断检查任务进度

    1. 需求

    今天的需求,介于轻量一次性,与需要服务器长时间执行之间,即 FastTest 中的发布功能。这里,我们有以下需求:

    1. 管理员点击 Publish 按钮,开始发布静态网站
    2. 服务器需要先保存数据,然后调用 webpack 发布所有语言版本
    3. 只有少数管理员会使用此功能,服务器压力不大
    4. 管理员希望能了解发布进度,及每一步的状态

    所以,提交任务后等待服务器慢慢跑就不合适;等待请求完成一次性看到所有结果也不合适;最合适的,就是基于长链接,不断获取返回信息,并在前端实时看到进度。也即是:流式输出 HTTP 响应。

    2. 实现

    2.1 后端

    后端实现我选择 node.js + express.js,在后端领域,这两个我比较熟悉。express.js 是在 node.js 网络模块上进行封装得来,使用起来很简单,也支持原生 node.js 方法。

    首先我们创建一个服务器:

    const express = require('express');
    const app = express();
    const port = 3100;
    
    app.listen(port, () => {
      console.log('FastTest Admin API at: ', port);
    });

    然后,我们在需要流式返回响应的接口里设置相应头 Content-type: application/octet-stream。接下来,只要我们不断向输出流写入内容就可以了。哦,对了,结束的时候,我们还要关闭输出流。

    app.post('/data', async(req, res, next) => {
      res.setHeader('Content-type', 'application/octet-stream');
      
      // 第一次返回
      res.write('Local data saved. Start to build dist。files.\n');
    
      // 数次返回
      for (const item of items) {
        await doSomething(item);
        res.write(`${item} done successfully.\n`);
      }
    
      // 最后,全部完成
      res.write('All done.');
      // 关闭输出流
      res.end();
    });

    2.2 axios

    axios 是很流行的 ajax 库,它进行了 Promise 封装,用起来很方便。这里我们要用 onDownloadProgress(即 axios 对 XMLHttpRequest.progress 的封装)获取下载进度,它会在每次服务器返回响应字符串的时候更新,我们只需要截取上次响应之后,这次响应新增的内容,即可。

    function publish(data, onDownloadProgress) {
      return axios.post('/data', data, {
        onDownloadProgress,
      });
    }
    
    async function doPublish() {
      if (isLoading.value) {
        return;
      }
    
      isPublishing.value = true;
      message.value = status.value = null;
    
      try {
        const { cases, lang } = store.state;
        let offset = 0;
        await publish({ cases, lang }, ({ target: xhr }) => {
          // responseText 包含了从一开始到此刻的全部响应内容,所以我们需要从上次结束的位置截取,获得新增的内容
          const { responseText } = xhr;
          const chunk = responseText.substring(offset);
          // 记录这一次的结束位置
          offset = responseText.length;
          currentStatus.value = chunk;
        });
        status.value = true;
        currentStatus.value = '';
        message.value = 'Published successfully.';
      } catch (e) {
        message.value = 'Failed to publish. ' + e.message;
      }
      isPublishing.value = false;
    }

    2.3 Vue3

    相对来说,Vue3 的部分最容易。这里我用了 animate.css 的 flash 效果,让信息更显眼。除此之外,就是简单的赋值。

    <template lang="pug">
    .alert.alert-info.mb-0.me-2.py-1.px-3.animated.flash.infinite.slower(
      v-if="currentStatus",
    ) {{currentStatus}}
    </template>

    3. 效果演示

    流式输出效果演示

    4. 部署

    一般来说,我们很少会直接用 node.js 当服务器,多半会启动 node.js 服务,然后用 nginx 反向代理给用户访问。这里需要注意,nginx 默认会将响应内容存入缓冲区,然后批量返回给客户端。这会导致流式输出无效,变成常规的执行完毕后一次性输出。

    所以我们必须修改 nginx 的配置:

    # 仅展示有关配置
    location /data {
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header Host $http_host;
            proxy_pass http://localhost:3100;
            # 关闭代理缓冲区,所有响应实时输出
            proxy_buffering off;
    }

    5. 总结 & 扩展阅读

    前些天我帮朋友做了个小项目,叫 FastTest。项目很简单,不过我主动扩展了技术选型,使用了非常多的技术,都是浅显入门的程度,非常适合用来做基础入门学习。本文也是从这个项目中提炼出来的。建议大家有空的时候看看,拉到本地,跑起来试试看效果。

    文中的几段代码,分别位于:

    项目的详细介绍请见:超全面全栈入门项目:fastest

    扩展阅读