在限制Kafka批量时,如何让Spark流在每一批都承诺?

2sbarzqh  于 2022-09-21  发布在  Kafka
关注(0)|答案(1)|浏览(172)

为了在使用Spark Streaming时限制批处理大小,我引用了这个answer

Kafka大约有5000万条记录储存(即将被消费)。这个主题有3个分区。

zhihu_comment   0          10906153        28668062        17761909        -               -               -
zhihu_comment   1          10972464        30271728        19299264        -               -               -
zhihu_comment   2          10906395        28662007        17755612        -               -               -

我的消费者应用程序:

public final class SparkConsumer {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) throws Exception {
    String brokers = "device1:9092,device2:9092,device3:9092";
    String groupId = "spark";
    String topics = "zhihu_comment";

    // Create context with a certain seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("TestKafkaStreaming");
    sparkConf.set("spark.streaming.backpressure.enabled", "true");
    sparkConf.set("spark.streaming.backpressure.initialRate", "10000");
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));

    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    kafkaParams.put("enable.auto.commit", true);
    kafkaParams.put("max.poll.records", "500");

    // Create direct kafka stream with brokers and topics
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
            jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(ConsumerRecord::value);
    lines.count().print();

    jssc.start();
    jssc.awaitTermination();
  }
}

我已经限制了Spark流的消耗大小,在我的例子中,我将maxRatePerPartition设置为10000,这意味着在我的例子中,它每批消耗300000条记录。

问题是,尽管Spark流能够处理具有特定限制的记录,但the current offset showing by kafka is not the offset that spark streaming is handling. As the kafka's current offset suddenly goes down to latest offset

zhihu_comment   0          28700537        28700676        139             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1
zhihu_comment   1          30305102        30305224        122             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1
zhihu_comment   2          28695033        28695146        113             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1

似乎Spark Streaming并不提交每批中的偏移量,它在开始消耗时提交最新的偏移量!

有没有办法让Spark Streaming与每一批都提交?

Spark流日志,证明其每批消耗的记录数:

20/05/04 22:28:13 INFO scheduler.DAGScheduler: Job 15 finished: print at SparkConsumer.java:65, took 0.012606 s
-------------------------------------------
Time: 1588602490000 ms
-------------------------------------------
300000

20/05/04 22:28:13 INFO scheduler.JobScheduler: Finished job streaming job 1588602490000 ms.0 from job set of time 1588602490000 ms
2wnc66cl

2wnc66cl1#

您需要禁用

kafkaParams.put("enable.auto.commit", false);

更确切地说,使用

messages.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // do here some transformations and action on the rdd, typically like:
  rdd.foreachPartition(it -> {
    it.foreach(row -> ...)
  })

  // commit messages
  ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
});

如《星火+Kafka整合指南》所述。

您还可以使用commitSync进行同步提交。

相关问题