将WebSocket流与Express Middleware集成以实现OpenAI API响应

rlcwz9us  于 11个月前  发布在  其他
关注(0)|答案(1)|浏览(124)

我正在开发一个OpenAI. js应用程序,需要集成OpenAI的流API响应,以便通过WebSockets实时发送到前端。我已经设置了WebSocket服务器和客户端通信,但在HTTP请求期间通过WebSockets从OpenAI API响应流传输数据时,我面临着挑战。
情况是这样的:

  • 我在server.js中有一个中间件,它应该为每个请求附加一个用户套接字Map,但它似乎没有按预期工作。
  • 在我的控制器中,在API请求期间,我希望访问用户的WebSocket并将数据流传输到它,因为它来自OpenAI的API。

下面是我的代码的简化版本:

server.js;

const WebSocket = require("ws");
const app = require("./app");

....

const server = app.listen(port, () => {
  console.log(`App runnig on port ${port}...`);
});

const userSockets = new Map();

app.use((req, res, next) => { //Doesn't work
  req.userSockets = userSockets;
  next();
});

const wss = new WebSocket.Server({ server });

wss.on("connection", function connection(ws) {
  console.log("WebSocket connection established");

  const userId = "11111111111";
  userSockets.set(userId, ws);

  ws.on("message", function incoming(message) {
    console.log("Received via WebSocket:", message.toString());
    if (message.toString() === "ping") {
      ws.send("pong");
    }
  });

  ws.on("close", function () {
    console.log("WebSocket connection closed");
  });
});

....

字符串

app.js;

...

const appRouter = require("./routes/appRoutes");

app.use("/app", appRouter);

....

module.exports = app;

appRoutes.js;

router.get("/testStream", appController.testStream, ...);

appController.js;

exports.testStream = catchAsync(async (req, res, next) => {

  const ws = req.userSockets.get("11111111111");          

  if (!ws) {
    return next(new AppError("WebSocket connection not found", 404));
  }

  const stream = await openai.chat.completions.create({
    model: model,
    messages: messages,
    stream: true,
  });

  const collected_stream = [];

  for await (const part of stream) {
    if (
      part.choices &&
      part.choices[0] &&
      part.choices[0].delta &&
      "content" in part.choices[0].delta
    ) {
      ws.send(
        JSON.stringify({
          type: "content",
          data: part.choices[0].delta.content,
        })
      );
      collected_stream.push(part.choices[0].delta.content);
      console.log(part.choices[0].delta.content);
    }
  }

  console.log(collected_stream.join(""));

  req.data = {
      historyId: "",
      content: collected_stream.join(""),
    };

  next();
});


我期望中间件将userSockets附加到req,但是req.userSockets在我的路由控制器中似乎没有定义。
1.如何正确地将userSocketsMap传递给中间件,以便它在路由控制器的请求对象中可用?
1.一旦我在控制器中访问了正确的WebSocket,是否有通过它发送OpenAI API流数据的最佳实践?
我已经阅读了相关的Express和WebSocket文档,但没有找到适合这种特殊情况的解决方案。任何帮助或指导都将非常感谢。提前感谢您的帮助。

jq6vz3qz

jq6vz3qz1#

首先,我创建了一个全局Map结构,将WebSocket连接存储在名为connection.js的文件中,并将此文件包含在server.js中

connection.js

const clients = new Map();
module.exports = clients;

字符串
接下来,我在我的server.js文件中使用了这个结构,将来自cookie的用户ID与WebSocket进行匹配:

....
const clients = require("./utils/connection");
const cookie = require("cookie");
const { verifyJwt } = require("./utils/verifyJwt");
....

wss.on("connection", function connection(ws, req) {

  const cookies = req.headers.cookie ? cookie.parse(req.headers.cookie) : {};
  const id = verifyJwt(cookies["access"]);
  if (!id) {
    ws.close();
    return;
  }

  clients.set(id.toString(), ws);

  console.log("WebSocket connection established for user", id);
  console.log("Active client number:", clients.size);

  ws.on("close", function () {
    clients.delete(id);
    console.log("Active client number:", clients.size);
  });
});


最后,在我的appController.js文件中,我利用这些连接通过WebSockets将数据从OpenAI API流到客户端:
appController.js:

...
const clients = require("../utils/connection");
const WebSocket = require("ws"); 
...

exports.testStream = catchAsync(async (req, res, next) => {

  const clientWs = clients.get(req.user._id.toString());

  ...

  // Send data from the stream over WebSocket
  for await (const part of stream) {
    if (part.choices && part.choices[0] && part.choices[0].delta && "content" in part.choices[0].delta) {
      if (clientWs && clientWs.readyState === WebSocket.OPEN) {
        clientWs.send(part.choices[0].delta.content);
      }
      collected_stream.push(part.choices[0].delta.content);
    }
  }

  ...

  next();
});


这种方法使我能够成功地通过WebSockets流式传输OpenAI API响应。我希望这种解决方案可以帮助其他遇到类似问题的人。

相关问题