暂停cassandra流以进行异步操作

d7v8vwbk  于 2021-06-14  发布在  Cassandra
关注(0)|答案(2)|浏览(474)

在处理下一行之前,我想暂停cassandra流进行一些异步操作。
每一行都在可读的事件侦听器中接收。我尝试过使用stream.pause,但实际上它不会暂停流。我也在“数据”事件侦听器中尝试过同样的方法,但也不起作用。非常感谢你的见解和解决方案。这是我的密码。在readable中使用async和在await中使用waiting实际上并不阻止在异步函数完成之前出现下一行。

function start() {
let stream = client.stream('SELECT * FROM table');
stream
    .on('end', function () {
        console.log(`Ended at ${Date.now()}`);
    })
    .on('error', function (err) {
        console.error(err);
    })
    .on('readable', function () {
        let row = this.read();
        asyncFunctionNeedTowaitForthisBeforeNextRow()
    })
}

//下面的方法不起作用

function start() {
let stream = client.stream('SELECT * FROM table');
stream
    .on('end', function () {
        console.log(`Ended at ${Date.now()}`);
    })
    .on('error', function (err) {
        console.error(err);
    })
    .on('readable', async function () {
        let row = this.read();
        stream.pause();
        await asyncFunctionNeedTowaitForthisBeforeNextRow();
        stream.resume();
    })
 }
sirbozc5

sirbozc51#

为什么 stream.pause() 不起作用是因为 readable 事件激发多次,因此再次调用相同的异步函数。同样的道理 data 事件。
我建议使用一个定制的可写流来正确处理所有这些异步内容。
可写流将类似于:

const {Writable} = require('stream');

const myWritable = new Writable({
  async write(chunk, encoding, callback) {
    let row = chunk.toString();
    await asyncFunctionNeedTowaitForthisBeforeNextRow();
    callback(); // Write completed successfully
  }
})

然后调整代码以使用此可写:

function start() {
  let stream = client.stream('SELECT * FROM table');
  stream.pipe(myWritable);
  stream
    .on('end', function () {
      console.log(`Ended at ${Date.now()}`);
    })
    .on('error', function (err) {
      console.error(err);
    })
}
gkn4icbw

gkn4icbw2#

请注意,即使您正在声明事件的处理程序 'readable' 作为一个异步函数,调用方不等待返回的承诺完成,因为 Stream 期望事件处理程序正常执行。
解决方案可以是:

stream
  .on('end', () => {})
  .on('error', () => {})
  .on('data', row => {
    stream.pause();
    doSomethingAsync(row).then(() => stream.resume());
  });

请注意,理想情况下,在执行异步操作时应该利用并行性,因此最好每次读取几行,然后暂停。

相关问题