在处理下一行之前,我想暂停cassandra流进行一些异步操作。
每一行都在可读的事件侦听器中接收。我尝试过使用stream.pause,但实际上它不会暂停流。我也在“数据”事件侦听器中尝试过同样的方法,但也不起作用。非常感谢你的见解和解决方案。这是我的密码。在readable中使用async和在await中使用waiting实际上并不阻止在异步函数完成之前出现下一行。
function start() {
let stream = client.stream('SELECT * FROM table');
stream
.on('end', function () {
console.log(`Ended at ${Date.now()}`);
})
.on('error', function (err) {
console.error(err);
})
.on('readable', function () {
let row = this.read();
asyncFunctionNeedTowaitForthisBeforeNextRow()
})
}
//下面的方法不起作用
function start() {
let stream = client.stream('SELECT * FROM table');
stream
.on('end', function () {
console.log(`Ended at ${Date.now()}`);
})
.on('error', function (err) {
console.error(err);
})
.on('readable', async function () {
let row = this.read();
stream.pause();
await asyncFunctionNeedTowaitForthisBeforeNextRow();
stream.resume();
})
}
2条答案
按热度按时间siv3szwd1#
为什么
stream.pause()
不起作用是因为readable
事件激发多次,因此再次调用相同的异步函数。同样的道理data
事件。我建议使用一个定制的可写流来正确处理所有这些异步内容。
可写流将类似于:
然后调整代码以使用此可写:
wqnecbli2#
请注意,即使您正在声明事件的处理程序
'readable'
作为一个异步函数,调用方不等待返回的承诺完成,因为Stream
期望事件处理程序正常执行。解决方案可以是:
请注意,理想情况下,在执行异步操作时应该利用并行性,因此最好每次读取几行,然后暂停。