NodeJS 如何使用pg-query-stream和async/await对记录进行流处理?

mv1qrgav  于 2022-11-29  发布在  Node.js
关注(0)|答案(1)|浏览(162)

使用pg-query-stream/node-postgres,我尝试实现一个实用程序方法,该方法可用于按顺序处理每条记录时,处理程序可以返回false,以中断迭代并立即返回。如果此实用程序方法迭代所有记录,并且处理程序为每个记录解析了true,并且如果处理程序解析了false,它就会解析false。但是当负载很大的时候,我的postgres池最终会进入一个完全没有响应的状态--我猜坏的连接会返回到池中,而下一个对它们发出的命令只是无限期地挂起??或者什么的...当然我不能可靠地再现它。pg-query-stream上的文档相当稀疏,所以我不完全确信我正确地使用了它。当我提前结束流时,只调用stream.destroy()并将客户端返回到池是否正确?我是否应该在成功完成和/或出错时调用stream.destroy()?是否预期“end”事件不荣誉流的“paused”状态?“end”是用于此目的的正确事件还是“close”更正确?“end”和“close”之间有什么区别?非常感谢您对正确用法的任何见解,谢谢!

public async forEachRecord(
    queryText: string,
    values: any[],
    recordHandler: (record: TRecord) => boolean | Promise<boolean>,
  ): Promise<boolean> {
  
    const query = new QueryStream(queryText, values);
    const client = await this.pool.connect();
    let error: any;
    return new Promise<boolean>((resolve, reject) => {
      let hasEnded = false;

      query.on('data', async (data) => {
        query.pause();
        try {
          if (await recordHandler(data)) {
            query.resume();
            return;
          }
          resolve(false);
        } catch (err) {
          reject(err);
        } finally {
          if (hasEnded) {
            resolve(true);
          }
        }
      });

      query.on('end', async () => {
        hasEnded = true;
        // 'end' can come while the last recordHandler() is still executing asynchronously, while the stream
        // is paused...so we need to do this check so the last recordHandler()'s return value is honored
        if (!query.isPaused()) {
          resolve(true);
        }
      });

      query.on('error', (err) => {
        error = err;
        reject(err);
      });

      client.query(query);

    }).finally(() => {
      query.destroy();
      client.release(error);
    });
  }
bd1hkmkf

bd1hkmkf1#

我遇到了同样的问题,因此我实现了pg-iterator

import {Pool} from 'pg';
import {QueryIterablePool} from 'pg-iterator';

const pool = new Pool(/* connection config */);

const q = new QueryIterablePool(pool); // creating our Pool container

const i = q.query('SELECT * FROM users WHERE id = $1', [123]);

for await(const u of i) {
    console.log(u); // output each row
}

查看库中的更多complete examples

相关问题