在重新加入时没有当前分区分配:< topic partition>;在运行时添加新分区不起作用

68de4m5k  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(207)

我有一个流应用程序,使用Kafka+sparkstreaming。我从一个分区,一个消费者示例,三个代理开始。我想按顺序做以下两件事:
1) 在运行时向主题添加新分区,我使用:

bin/kafka-topics.sh --zookeeper 192.168.101.164:2181 --alter --topic topic10 --partitions 2

2) 将新的使用者示例添加到特定的使用者组,这是通过将相同的使用者代码作为单独的进程运行来实现的。
但是,在执行1时,我没有看到(唯一的)使用者示例使用新添加的分区。它只使用zookeeper最初自动分配的分区。
在执行2时:a)新示例将自动分配一个分区 topic10-1 成功发生,但b)第一个示例在重新平衡后重新加入,但失败,出现以下错误(尽管a)

17/06/07 16:47:02 INFO ConsumerCoordinator: Revoking previously assigned partitions [topic10-0] for group SparkConsumerGrp
17/06/07 16:47:02 INFO AbstractCoordinator: (Re-)joining group SparkConsumerGrp
17/06/07 16:47:02 INFO AbstractCoordinator: Successfully joined group SparkConsumerGrp with generation 17
17/06/07 16:47:02 INFO ConsumerCoordinator: Setting newly assigned partitions [topic10-0] for group SparkConsumerGrp
17/06/07 16:47:02 ERROR JobScheduler: Error generating jobs for time 1496879222000 ms
java.lang.IllegalStateException: No current assignment for partition topic10-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
    at scala.Option.orElse(Option.scala:289)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
    at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
    at scala.Option.orElse(Option.scala:289)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2$$anonfun$apply$29.apply(DStream.scala:900)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2$$anonfun$apply$29.apply(DStream.scala:899)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2.apply(DStream.scala:899)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2.apply(DStream.scala:877)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264)
    at org.apache.spark.streaming.dstream.DStream.slice(DStream.scala:877)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$1.apply(DStream.scala:871)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$1.apply(DStream.scala:871)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264)
    at org.apache.spark.streaming.dstream.DStream.slice(DStream.scala:870)
    at org.apache.spark.streaming.dstream.WindowedDStream.compute(WindowedDStream.scala:65)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
    at scala.Option.orElse(Option.scala:289)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

我注意到新示例成功加入组的时间与第一个示例每次失败的时间完全匹配。我正在使用 Kafka version 0.10.0, Kafka-client version 0.10.0.1, spark-streaming-kafka-0-10_2.11 version 2.1.1 .
最后,这里是消费者代码:

SparkConf sparkConf = new  SparkConf().setMaster("local[5]").setAppName("SparkConsumer1").set("spark.driver.host", "localhost");

    JavaSparkContext sc = new JavaSparkContext(sparkConf);

    // Create a StreamingContext with a 1 second batch size
    JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(15));    
    List<String> topicSet = Arrays.asList(topics.split(","));

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", brokers);
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("group.id", "SparkConsumerGrp3");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("zookeeper.connect", "192.168.101.164:2181");
    kafkaParams.put("enable.auto.commit", "true");
    kafkaParams.put("auto.commit.interval.ms", "1000");
    kafkaParams.put("session.timeout.ms","30000");

    final JavaInputDStream<ConsumerRecord<String, String>> messages =
              KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferBrokers(),
                ConsumerStrategies.<String, String>Subscribe(topicSet, kafkaParams)
              );

我在这里也发现了类似的东西,但没有真正的答案。我真的非常感谢任何帮助。谢谢。
更新1)我能够让使用者从新添加的分区获取数据。我们可以使用 metadata.max.age.ms 设置为 300000 默认情况下。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题