使用pg-query-stream/node-postgres,我尝试实现一个实用程序方法,该方法可用于按顺序处理每条记录时,处理程序可以返回false
,以中断迭代并立即返回。如果此实用程序方法迭代所有记录,并且处理程序为每个记录解析了true
,并且如果处理程序解析了false
,它就会解析false
。但是当负载很大的时候,我的postgres池最终会进入一个完全没有响应的状态--我猜坏的连接会返回到池中,而下一个对它们发出的命令只是无限期地挂起??或者什么的...当然我不能可靠地再现它。pg-query-stream上的文档相当稀疏,所以我不完全确信我正确地使用了它。当我提前结束流时,只调用stream.destroy()
并将客户端返回到池是否正确?我是否应该在成功完成和/或出错时调用stream.destroy()
?是否预期“end”事件不荣誉流的“paused”状态?“end”是用于此目的的正确事件还是“close”更正确?“end”和“close”之间有什么区别?非常感谢您对正确用法的任何见解,谢谢!
public async forEachRecord(
queryText: string,
values: any[],
recordHandler: (record: TRecord) => boolean | Promise<boolean>,
): Promise<boolean> {
const query = new QueryStream(queryText, values);
const client = await this.pool.connect();
let error: any;
return new Promise<boolean>((resolve, reject) => {
let hasEnded = false;
query.on('data', async (data) => {
query.pause();
try {
if (await recordHandler(data)) {
query.resume();
return;
}
resolve(false);
} catch (err) {
reject(err);
} finally {
if (hasEnded) {
resolve(true);
}
}
});
query.on('end', async () => {
hasEnded = true;
// 'end' can come while the last recordHandler() is still executing asynchronously, while the stream
// is paused...so we need to do this check so the last recordHandler()'s return value is honored
if (!query.isPaused()) {
resolve(true);
}
});
query.on('error', (err) => {
error = err;
reject(err);
});
client.query(query);
}).finally(() => {
query.destroy();
client.release(error);
});
}
1条答案
按热度按时间bd1hkmkf1#
我遇到了同样的问题,因此我实现了pg-iterator:
查看库中的更多complete examples。