NodeJS 如何在express.js中使用server-sent-events

q5iwbnjs  于 2023-06-22  发布在  Node.js
关注(0)|答案(6)|浏览(185)

我用express.js设置了我的REST服务器。现在我想把sse添加到这个服务器上。在我实现this sse包后,我得到一个错误。我知道我得到这个错误,当会尝试使用res.send两次,但我没有。

ERROR: Error: Can't set headers after they are sent.                                            
    at ServerResponse.OutgoingMessage.setHeader (http.js:690:11)                            
    at ServerResponse.header (/home/root/node_modules/express/lib/response.js:718:10)         
    at ServerResponse.send (/home/root/node_modules/express/lib/response.js:163:12)            
    at app.get.str (/home/root/.node_app_slot/main.js:1330:25)                                 
    at Layer.handle [as handle_request] (/home/root/node_modules/express/lib/router/layer.js:95:5)    
    at next (/home/root/node_modules/express/lib/router/route.js:131:13)                       
    at sse (/home/root/node_modules/server-sent-events/index.js:35:2)                          
    at Layer.handle [as handle_request] (/home/root/node_modules/express/lib/router/layer.js:95:5)  
    at next (/home/root/node_modules/express/lib/router/route.js:131:13)                          
    at Route.dispatch (/home/root/node_modules/express/lib/router/route.js:112:3)

有没有可能我不能再在sse函数中使用express方法了?例如:

app.get('/events', sse, function(req, res) {
    res.send('...');
});

此外,我还找到了this解决方案和this。是否可以使用res.write函数或其他方式来创建sse,而不使用其他软件包?

p8ekf7hl

p8ekf7hl1#

我不同意使用Socket.IO来实现基本的服务器发送事件。浏览器API非常简单,Express中的实现只需要对正常响应路由进行几处更改:

app.get('/streaming', (req, res) => {

    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Access-Control-Allow-Origin', '*');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders(); // flush the headers to establish SSE with client

    let counter = 0;
    let interValID = setInterval(() => {
        counter++;
        if (counter >= 10) {
            clearInterval(interValID);
            res.end(); // terminates SSE session
            return;
        }
        res.write(`data: ${JSON.stringify({num: counter})}\n\n`); // res.write() instead of res.send()
    }, 1000);

    // If client closes connection, stop sending events
    res.on('close', () => {
        console.log('client dropped me');
        clearInterval(interValID);
        res.end();
    });
});
  • 根据规范设置适当的标题
  • 使用res.flushHeaders()建立SSE连接
  • 使用res.write()而不是res.send()发送数据
  • 要结束来自服务器的流,请使用res.end()

上面的代码片段使用setInterval()模拟向客户端发送数据10秒,然后结束连接。客户端将收到一个丢失连接的错误,并自动尝试重新建立连接。为了避免这种情况,您可以在出错时关闭客户端,或者让浏览器发送一个特定的事件消息,客户端理解这意味着优雅地关闭。如果客户端关闭连接,我们可以捕获'close'事件,以优雅地结束服务器上的连接并停止发送事件。
express:4.17.1节点:10.16.3

m528fe3b

m528fe3b2#

你完全可以在没有其他软件包的情况下实现这一点。
我写了一篇关于这个的博客文章,part 1列出了基础知识。
您不能关闭SSE,因为这会破坏功能。关键是它是一个开放的HTTP连接。这允许在任何时候将新事件推送到客户端。

kdfy810k

kdfy810k3#

这为John's excellent answer添加了一个完整的可运行示例,并进行了调整,添加了Connection: keep-alive头。还包括一个read the stream的客户端,并处理多个块同时到达的可能性,这似乎是fetch的一个特性。
JSON不是绝对必要的,但对于将数据有效负载与SSE元数据分离很有用。

server.js

const express = require("express");
const app = express();

app.use(express.static("public"));

app.get("/stream", (req, res) => {
  res.writeHead(200, {
    "Connection": "keep-alive",
    "Cache-Control": "no-cache",
    "Content-Type": "text/event-stream",
  });

  let counter = 0;
  const interval = setInterval(() => {
    const chunk = JSON.stringify({chunk: counter++});
    res.write(`data: ${chunk}\n\n`);
  }, 100);

  res.on("close", () => {
    clearInterval(interval);
    res.end();
  });
});

const listener = app.listen(process.env.PORT || 3000, () =>
  console.log(`Your app is listening on port ${listener.address().port}`)
);

public/index.html

<!DOCTYPE html>
<html lang="en">
<head><title>SSE POC</title></head>
<body>
<script>
(async () => {
  const response = await fetch("/stream", {
    headers: {
      "Accept": "text/event-stream",
    },
  });

  if (!response.ok) {
    throw Error(response.statusText());
  }

  for (const reader = response.body.getReader(); ; ) {
    const {value, done} = await reader.read();

    if (done) {
      break;
    }

    const chunk = new TextDecoder().decode(value);
    const subChunks = chunk.split(/(?<=})\n\ndata: (?={)/);

    for (const subChunk of subChunks) {
      const payload = subChunk.replace(/^data: /, "");
      document.body.innerText = JSON.parse(payload).chunk;
    }
  }
})();
</script>
</body>
</html>

node server.js之后,将浏览器导航到localhost:3000。您也可以直接使用curl localhost:3000/stream测试流。
我不会重复John的回答,但是,简而言之,我们设置必要的头并刷新它们以开始连接,然后使用res.write发送一个数据块。调用res.end()终止服务器上的连接,或侦听res.on("close", ...)以查找关闭连接的客户端。
客户端使用fetchresponse.body.getReader(),可以使用const {value, done} = await reader.read()读取,使用TextDecoder().decode(value)读取decoded
参见https://masteringjs.io/tutorials/express/server-sent-events
Express 4.18.2,Node 18.16.0,Chrome Version 114.0.5735.110(Official Build)(64-bit)

sauutmhj

sauutmhj4#

从您正在使用的库的文档中可以看出,当将res.sse用作函数的中间件时,应该使用res.sse。参见:https://www.npmjs.com/package/server-sent-events
但是,正如你提到的,他们的代码实际上所做的就是 Package res.write。参见:https://github.com/zacbarton/node-server-sent-events/blob/master/index.js#L11

uajslkp6

uajslkp65#

自我推销:我写了ExpreSSE包,它提供了在express中使用SSE的中间件,你可以在npm上找到它:@toverux/express.
一个简单的例子:

router.get('/events', sse(/* options */), (req, res) => {
    let messageId = parseInt(req.header('Last-Event-ID'), 10) || 0;

    someModule.on('someEvent', (event) => {
        //=> Data messages (no event name, but defaults to 'message' in the browser).
        res.sse.data(event);
        //=> Named event + data (data is mandatory)
        res.sse.event('someEvent', event);
        //=> Comment, not interpreted by EventSource on the browser - useful for debugging/self-documenting purposes.
        res.sse.comment('debug: someModule emitted someEvent!');
        //=> In data() and event() you can also pass an ID - useful for replay with Last-Event-ID header.
        res.sse.data(event, (messageId++).toString());
    });
});

还有另一个中间件可以将相同的事件推送到多个客户端。

camsedfj

camsedfj6#

新答案:

只需使用socket.io,它更容易,更好!https://www.npmjs.com/package/socket.io#in-conjunction-with-express
基本设置:

const express = require('express');
const PORT = process.env.PORT || 5000;
const app = express();
const server = require('http').createServer(app);
const io = require('socket.io')(server);
// listen to socket connections
io.on('connection', function(socket){
  // get that socket and listen to events
  socket.on('chat message', function(msg){
    // emit data from the server
    io.emit('chat message', msg);
  });
});
// Tip: add the `io` reference to the request object through a middleware like so:
app.use(function(request, response, next){
  request.io = io;
  next();
});
server.listen(PORT);
console.log(`Listening on port ${PORT}...`);

在任何路由处理程序中,都可以使用socket.io:

app.post('/post/:post_id/like/:user_id', function likePost(request, response) {
  //...
  request.io.emit('action', 'user liked your post');
})

客户端:

<script src="/socket.io/socket.io.js"></script>
<script src="https://code.jquery.com/jquery-1.11.1.js"></script>
<script>
  $(function () {
    var socket = io();
    $('form').submit(function(e){
      e.preventDefault(); // prevents page reloading
      socket.emit('chat message', $('#m').val());
      $('#m').val('');
      return false;
    });
    socket.on('chat message', function(msg){
      $('#messages').append($('<li>').text(msg));
    });
  });
</script>

完整示例:https://socket.io/get-started/chat/

原始答案:

某人(用户:https://stackoverflow.com/users/451634/benny-neugebauer|从这篇文章:addEventListener on custom object)字面上给了我一个提示,告诉我如何在没有express以外的任何其他包的情况下实现这个!我让它工作!
首先,导入Node的EventEmitter:

const EventEmitter = require('events');

然后创建一个示例:

const Stream = new EventEmitter();

然后为事件流创建GET路由:

app.get('/stream', function(request, response){
  response.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  });

  Stream.on("push", function(event, data) {
    response.write("event: " + String(event) + "\n" + "data: " + JSON.stringify(data) + "\n\n");
  });
});

在这个GET路由中,您写回请求是200 OK,content-type是text/event-stream,没有缓存,并且保持活跃。
您还将调用EventEmitter示例的.on方法,该方法接受2个参数:要监听的事件字符串和处理该事件的函数(该函数可以接受给定的参数)
现在...要发送服务器事件,你所要做的就是调用EventEmitter示例的.emit方法:

Stream.emit("push", "test", { msg: "admit one" });

第一个参数是要触发的事件的字符串(确保它与GET路由中的事件相同)。.emit方法的每个后续参数都将传递给侦听器的回调!
就是这样!
由于示例是在路由定义之上的作用域中定义的,因此可以从任何其他路由调用.emit方法:

app.get('/', function(request, response){
  Stream.emit("push", "test", { msg: "admit one" });
  response.render("welcome.html", {});
});

多亏了JavaScript作用域的工作原理,你甚至可以将EventEmitter示例传递给其他函数,甚至可以从其他模块传递:

const someModule = require('./someModule');

app.get('/', function(request, response){
  someModule.someMethod(request, Stream)
  .then(obj => { return response.json({}) });
});

在一些模块中:

function someMethod(request, Stream) { 
  return new Promise((resolve, reject) => { 
    Stream.emit("push", "test", { data: 'some data' });
    return resolve();
  }) 
}

这么简单!不需要其他包裹!
下面是Node的EventEmitter类的链接:https://nodejs.org/api/events.html#events_class_eventemitter
我的例子:

const EventEmitter = require('events');
const express = require('express');
const app = express();

const Stream = new EventEmitter(); // my event emitter instance

app.get('/stream', function(request, response){
  response.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  });

  Stream.on("push", function(event, data) {
    response.write("event: " + String(event) + "\n" + "data: " + JSON.stringify(data) + "\n\n");
  });
});

setInterval(function(){
  Stream.emit("push", "test", { msg: "admit one" });
}, 10000)

相关问题