我对node.js很陌生,不完全理解流的概念。为什么提供者服务的写缓冲区取决于消费者服务的运行速度?在基于HTTP的通信中,提供者不应该不知道消费者服务的读缓冲区的状态吗?
提供程序服务的代码:
import express from 'express';
import mockData from './mockData.js';
import { Readable } from 'stream';
const app = express();
function createDataSource() {
return Readable.from(mockData);
}
app.get('/stream', (req, res) => {
res.setHeader('Content-Type', 'text/plain');
const readableStream = createDataSource();
readableStream.on('data', (chunk) => {
const shouldContinue = res.write(chunk);
if (!shouldContinue) {
readableStream.pause();
res.once('drain', () => {
readableStream.resume();
});
}
});
readableStream.on('end', () => {
res.end();
});
});
app.listen(3000, () => {
console.log('sender is listening on port 3000');
});
字符串
消费者服务的代码:
import express from 'express';
import axios from 'axios';
import util from 'util';
import { Transform } from 'stream';
const setTimeoutPromise = util.promisify(setTimeout);
const app = express();
app.get('/receive-stream', async (req, res) => {
const response = await axios.get('http://localhost:3000/stream', {
responseType: 'stream',
});
const stream = response.data;
const transform = new Transform({
async transform(chunk, enc, callback) {
await setTimeoutPromise(2);
callback(null, chunk.toString().toUpperCase());
},
});
stream.pipe(transform).pipe(res);
});
app.listen(4000, () => {
console.log('consumer is running on port 4000');
});
型
(Call http://localhost:4000/receive-stream来测试整个工作流,mockData.js应该足够大以引起背压)
我如何想象数据传输:提供程序-->写缓冲区(提供程序端)-->读取缓冲区(消费者端)-->消费者我不明白的是为什么写缓冲区的耗尽(当它满了)在供应商的一方取决于消费者的速度?据我所知,提供者应该不知道消费者是否能够接受更多的数据,因为这是一个单向的通信,但是,当在使用者端增加超时时,提供者写缓冲区的耗尽需要更长的时间。(例如,从文件中阅读,然后写入文件),但是在这种情况下,数据在服务之间流动,所以他们应该不知道对方的缓冲区。有人能解释一下我误解了什么吗?
1条答案
按热度按时间093gszye1#
据我理解,供应商应该不知道消费者是否能够接受更多数据,因为这是单向的沟通
从你的Angular 来看,它是单向的,但它是HTTP连接下面的TCP连接,TCP有内置的流量控制。
当流的消费者施加反压力时,网络缓冲区很快就会填满。当接收到的数据包被确认时,也会发送一个 * 窗口大小 *。这个窗口大小表示还可以发送多少数据。(换句话说,接收客户端上的可用缓冲区。)
最终,窗口大小变为零。发送数据的主机知道它不能再发送任何东西。但是,您的应用程序仍然继续写入。这将填充写入端的缓冲区。
当您使用内置流时,流会跟踪它们何时可以写入,何时不能写入。
所以是的,基本上,客户端施加反压力,这个反压力最终会一直备份到服务器。