使用相同group.id使用相同Kafka主题的不同spark上下文

aurhwmvo  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(228)

我尝试使用多个具有相同配置(也具有相同group.id)的spark本地示例从kafka主题读取日志,但当一个使用者“加入”时,另一个已断开连接,显示此错误:

2017-09-24 16:42:40,011 INFO  [JobGenerator] internals.ConsumerCoordinator (ConsumerCoordinator.java:onJoinComplete(262)) - Setting newly assigned partitions [raul-10-8, raul-10-9, raul-10-5, raul-10-6, raul-10-7] for group example
2017-09-24 16:42:40,017 ERROR [JobScheduler] scheduler.JobScheduler (Logging.scala:logError(91)) - Error generating jobs for time 1506271360000 ms
java.lang.IllegalStateException: No current assignment for partition raul-10-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:348)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1287)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:194)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
    at scala.Option.orElse(Option.scala:289)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition raul-10-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:348)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1287)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:194)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
    at scala.Option.orElse(Option.scala:289)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

我想这是因为我在使用无关的spark示例来阅读主题。这有没有办法在不创建集群的情况下解决这个问题?
编辑:这是Kafka直达流的代码

static JavaStreamingContext defaultStreamingContext(Duration batchDuration) {
    SparkConf sparkConf = new SparkConf().setAppName("example")
                                         .setMaster("local[*]");
    JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, batchDuration);
    javaStreamingContext.sparkContext();
    return javaStreamingContext;
}

static JavaInputDStream<ConsumerRecord<String, String>> kafkaStream(JavaStreamingContext streamingContext, Collection<String> topics) {
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", getBootstrapServers());
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "example");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);
    return KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
}

    private void initStream(JavaInputDStream<ConsumerRecord<String, String>> dStream) {
dStream.foreachRDD(rdd -> {
  // Do work
});

}
谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题