我有一个spark kafka生产者(用java编写),每天批量运行。我想控制每天生成的数据的大小。
假设我想在一天内以多个批的形式生成100gb大小的数据(所有批的总和),然后我想停止这个过程。如何计算和保存每个批中生成的数据的大小(以字节为单位),以便在总数达到100GB时停止处理?
以下是我在producer中使用的API:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
制作人看起来像:
public void produceData(String key, String val) {
if (null == kafkaProducer) {
kafkaProducer = new KafkaProducer<String, String>(<Kafka Parameters>);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
kafkaProducer.close();
}
});
}
if (key != null) {
kafkaProducer.send(new ProducerRecord(topic, key, val), new ProducerCallback(key, val) {
public void flush() {
if (null != kafkaProducer) {
kafkaProducer.flush();
}
}
});
}
}
暂无答案!
目前还没有任何答案,快来回答吧!