由kafka.common.offsetAutoFrangeException引起

wyyhbhjk  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(236)

我正在使用kafka和spark将数据更新流到hbase表中。
但我一直得到offsetofrangeexception,这是我的代码:

new KafkaStreamBuilder()
                .setStreamingContext(streamingContext)
                .setTopics(topics)
                .setDataSourceId(dataSourceId)
                .setOffsetManager(offsetManager)
                .setConsumerParameters(
                    ImmutableMap
                        .<String, String>builder()
                        .putAll(kafkaConsumerParams)
                        .put("group.id", groupId)
                        .put("metadata.broker.list", kafkaBroker))
                        .build()
                )
                .build()
                .foreachRDD(
                    rdd -> {
                        rdd.foreachPartition(
                            iter -> {
                                final Table hTable = createHbaseTable(settings);
                                try {
                                    while (iter.hasNext()) {
                                        String json = new String(iter.next());
                                        try {
                                            putRow(
                                                hTable,
                                                json,
                                                settings,
                                                barrier);
                                        } catch (Exception e) {
                                            throw new RuntimeException("hbase write failure", e);
                                        }
                                    }
                                } catch (OffsetOutOfRangeException e) {throw new RuntimeException(
                                        "encountered OffsetOutOfRangeException: ", e);
                                }
                            });
                    });

我将流式处理作业设置为每5分钟运行一次,每次,在我的消费者完成一批流式处理之后,它都会将最新的标记和检查点写入s3。下一次,在流作业运行之前,它将从s3读取以前的检查点和标记,然后从那里开始。
以下是异常stacktrace:

at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: kafka.common.OffsetOutOfRangeException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:188)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:212)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)

我所做的:我已经检查过了,标记和检查点都按预期工作。
所以,我在这里有点迷茫,这个异常怎么会发生,可能的/合理的解决办法是什么?
谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题