Kafka消费者阅读的字节数远远超出预期

dgenwo3n  于 2023-01-08  发布在  Apache
关注(0)|答案(1)|浏览(127)
    • TLDR:**实际上,我们产生的数据量约为800 kB/s,而消费者读取的数据量为130 MB/s,这是我们产生的数据量的150倍多...
    • 更详细一点。**我们有一些Golang生产商在Redpanda中对一些主题的写入速度约为800 kB/s(Kafka)集群。然后我们有几个KafkaJS消费者,他们中的一些人从单个主题阅读,而其他人从多个主题阅读。所有消费者都处理他们期望阅读的消息,但是其中一个消费者读取的数据比预期的要多。它读取的速度大约是130 MB/s。

该图显示了我们对群集的读写量。
我试着删除了一段时间的消费者,但再次添加它只是让它爬回到100 + MB/s。当删除和添加消费者迅速它跳回到100 + MB/s。当添加多个消费者它只是加起来。200 + MB/s,300 + MB/s等。
我问了周围,但没有任何运气找到任何人与同样的问题,也不能找到任何错误的客户端代码。

this.redpandaClient = new Kafka({
      brokers: [redpandaBrokerUrl],
      clientId: "time-series-storage-writer",
      logLevel: logLevel.ERROR,
    });

 this.redpandaConsumer = this.redpandaClient.consumer({
      groupId: `time-series-storage-writer`,
      maxBytes: 1048576,
      maxWaitTimeInMs: 1000,
      minBytes: 131072,
    });
await redpandaClient.redpandaConsumer.run({
    eachBatch: measurementProcessingUseCases.eachBatchHandler,
    eachBatchAutoResolve: true,
  });
eachBatchHandler = async ({
    batch,
    heartbeat,
    isRunning,
    isStale,
  }: EachBatchPayload): Promise<void> => {
    const measurements: RepositoryMeasurement[] = [];
    for (const message of batch.messages) {
      if (!isRunning() || isStale()) {
        break;
      }

      const measurement = this.preprocessMessage(message);
      measurements.push(measurement);
    }

    if (measurements.length > 0) {
      const chunks = this.divideMeasurementsIntoChunks(measurements);
      for (const chunk of chunks) {
        try {
          await this.config.timeSeriesRepository.storeMeasurements(chunk);
        } catch (error) {
          throw new Error(`An error occurred while storing: ${error}`);
        }
        await heartbeat();
      }
    }
  };
5uzkadbs

5uzkadbs1#

由于您正在处理批处理,因此需要将批处理中的消息标记为已处理。否则,它将始终从最后一个未解析的偏移量读取。

resolveOffset(message.offset)
await heartbeat()

看起来它正在尝试从头读取,因为没有标记的消息。
示例见https://kafka.js.org/docs/consuming
欢迎您加入红Pandas社区slack频道,从核心工程师那里获得更多帮助:)

相关问题