kafka生产者控制多批数据的总大小

iyzzxitl  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(300)

我有一个spark kafka生产者(用java编写),每天批量运行。我想控制每天生成的数据的大小。
假设我想在一天内以多个批的形式生成100gb大小的数据(所有批的总和),然后我想停止这个过程。如何计算和保存每个批中生成的数据的大小(以字节为单位),以便在总数达到100GB时停止处理?
以下是我在producer中使用的API:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;

制作人看起来像:

public void produceData(String key, String val) {

    if (null == kafkaProducer) {

        kafkaProducer = new KafkaProducer<String, String>(<Kafka Parameters>);
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                kafkaProducer.close();
            }
        });

    }

    if (key != null) {

        kafkaProducer.send(new ProducerRecord(topic, key, val), new ProducerCallback(key, val) {
            public void flush() {
                if (null != kafkaProducer) {
                    kafkaProducer.flush();
                }
            }
        });

    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题