apachestorm使用kafka-spout给出错误:illegalstateexception

1aaf6o9v  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(333)
Version Info: 
   "org.apache.storm" % "storm-core" % "1.2.1" 
   "org.apache.storm" % "storm-kafka-client" % "1.2.1"

我有一个storm拓扑结构,如下所示:
螺栓A->螺栓B->螺栓C->螺栓D boltA 只是对请求进行一些格式化并发出另一个元组。 boltB 执行一些处理,并为接受的每个元组发出大约100个元组。 boltC 以及 boltD 处理这些元组。所有螺栓和工具 BaseBasicBolt .
我注意到的是 boltD 标记一些 tuple 通过抛出 FailedException ,在比拓扑超时时间短几分钟后,出现以下错误:

2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
2018-11-30T20:01:05.262+05:30 executor [ERROR]
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]

似乎发生的是当 boltB 发射100个元组 boltD 在这100个元组中有一个元组失败,我得到了这个错误。无法理解如何解决这个问题,理想情况下应该是这样 ack 当所有100个元组都是 acked ,但可能是原始元组 acked 在所有的100个元组 acked ,从而导致此错误。
编辑:
我可以用两个螺栓复制以下拓扑结构,在集群模式下运行大约5分钟后可以复制:
博尔塔

case class Abc(index: Int, rand: Boolean)

class BoltA  extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
    val inp = input.getBinaryByField("value").getObj[someObj]
    val randomGenerator = new Random()

    var i = 0
    val rand = randomGenerator.nextBoolean()
    1 to 100 foreach {
      collector.emit(new Values(Abc(i, rand).getJsonBytes))
      i += 1
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("boltAout"))
  }

}

螺栓B

class BoltB  extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
    val abc = input.getBinaryByField("boltAout").getObj[Abc]
    println(s"Received ${abc.index}th tuple in BoltB")
    if(abc.index >= 97 && abc.rand){
      println(s"throwing FailedException for ${abc.index}th tuple for")
      throw new FailedException()
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
  }
}

Kafka普特:

private def getKafkaSpoutConfig(source: Config) = KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", "queueName")
    .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
    .setOffsetCommitPeriodMs(100)
    .setRetry(new KafkaSpoutRetryExponentialBackoff(
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
      10,
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
    ))
    .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", "UNCOMMITTED_EARLIEST")))
    .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset", 10000))
    .build()

其他配置:
消息超时秒数:300

ohtdti5x

ohtdti5x1#

这个补丁是由@stig rohde提供的ø在这里。问题的确切原因如下:
在storm-2666和后续版本的修复中,我们添加了逻辑来处理以下情况:在已经确认以下偏移量之后,喷口接收到偏移量的确认。问题是,喷口可能会提交所有已确认的补偿,但不会向前调整消费者位置,或正确清除等待提交。如果acked偏移量远远落后于log end偏移量,那么喷口可能会轮询它已经提交的偏移量。
修正有点错误。当使用者位置落在提交的偏移量后面时,我们确保向前调整位置,并清除提交的偏移量后面的所有waitingtoemit消息。除非我们调整消费者的立场,否则我们不会取消等待限制,这是一个问题。
例如,假设偏移量1已失败,偏移量2-10已确认,maxpollrecords为10。假设Kafka有11条记录(1-11)。如果壶嘴寻找回偏移量1来重放它,它将从投票中的消费者那里得到偏移量1-10。消费者的地位现在是11。喷口发出偏移量1。说它马上就被确认了。在下一次投票中,喷口将提交偏移量1-10,并检查是否应该调整消费者位置和等待时间限制。因为位置(11)在提交的偏移量(10)之前,所以它不会清除waitingtoemit。因为waitingtoemit仍然包含上一次轮询的偏移量2-10,所以喷口将再次发出这些元组。
我们可以在这里看到解决办法。

相关问题