- TLDR:**实际上,我们产生的数据量约为800 kB/s,而消费者读取的数据量为130 MB/s,这是我们产生的数据量的150倍多...
- 更详细一点。**我们有一些Golang生产商在Redpanda中对一些主题的写入速度约为800 kB/s(Kafka)集群。然后我们有几个KafkaJS消费者,他们中的一些人从单个主题阅读,而其他人从多个主题阅读。所有消费者都处理他们期望阅读的消息,但是其中一个消费者读取的数据比预期的要多。它读取的速度大约是130 MB/s。
该图显示了我们对群集的读写量。
我试着删除了一段时间的消费者,但再次添加它只是让它爬回到100 + MB/s。当删除和添加消费者迅速它跳回到100 + MB/s。当添加多个消费者它只是加起来。200 + MB/s,300 + MB/s等。
我问了周围,但没有任何运气找到任何人与同样的问题,也不能找到任何错误的客户端代码。
this.redpandaClient = new Kafka({
brokers: [redpandaBrokerUrl],
clientId: "time-series-storage-writer",
logLevel: logLevel.ERROR,
});
this.redpandaConsumer = this.redpandaClient.consumer({
groupId: `time-series-storage-writer`,
maxBytes: 1048576,
maxWaitTimeInMs: 1000,
minBytes: 131072,
});
await redpandaClient.redpandaConsumer.run({
eachBatch: measurementProcessingUseCases.eachBatchHandler,
eachBatchAutoResolve: true,
});
eachBatchHandler = async ({
batch,
heartbeat,
isRunning,
isStale,
}: EachBatchPayload): Promise<void> => {
const measurements: RepositoryMeasurement[] = [];
for (const message of batch.messages) {
if (!isRunning() || isStale()) {
break;
}
const measurement = this.preprocessMessage(message);
measurements.push(measurement);
}
if (measurements.length > 0) {
const chunks = this.divideMeasurementsIntoChunks(measurements);
for (const chunk of chunks) {
try {
await this.config.timeSeriesRepository.storeMeasurements(chunk);
} catch (error) {
throw new Error(`An error occurred while storing: ${error}`);
}
await heartbeat();
}
}
};
1条答案
按热度按时间5uzkadbs1#
由于您正在处理批处理,因此需要将批处理中的消息标记为已处理。否则,它将始终从最后一个未解析的偏移量读取。
看起来它正在尝试从头读取,因为没有标记的消息。
示例见https://kafka.js.org/docs/consuming
欢迎您加入红Pandas社区slack频道,从核心工程师那里获得更多帮助:)