kafka流papi:processor close在启动时被调用过多

4uqofj5v  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(304)

如编写papi应用程序的合流文档所示,您应该关闭处理器中使用的存储区,并覆盖close方法。
在wordcountprocessor示例中,它显示了在call to close()方法中应该如何关闭存储。
我也做过类似的事情(我不是在init()方法中启动它们,而是在scala中使用lazy val),我发现我的processor close()方法在创建存储之后被多次调用。

class EventWindowProcessor(sessionStoreName: String, lastSessionByChannelStoreName: String, lastChannelStoreName: String)
extends AbstractProcesso

// example of a store
private lazy val lastChannelStore: KeyValueStore[MyKey, Channel] =
  context()
    .getStateStore(lastChannelStoreName)
    .asInstanceOf[KeyValueStore[MyKey, Channel]]

override def init(context: ProcessorContext) = {
  super.init(context)
}

override def close() = {
  logger.info("CLOSING PROCESSOR") 
}

override def process(key: String, value: String): Unit = {
    // ... my stuff here
}

因此,我得到以下输出,显示了processor.close()在拓扑运行开始时被多次调用,并且在应用程序的稍后点也被调用。

[2018-06-08 05:13:16,255] INFO Stream Application starting, name: stream-processor (my.package.StreamProcessorApplication$)
[2018-06-08 05:13:16,760] INFO Topology: Sub-topologies:
Sub-topology: 0
Source: event-source (topics: [events])
--> session-processor
Processor: session-processor (stores: [sessionStoreName, lastSessionByChannelStoreName, lastChannelStoreName])
--> error-event-sink, order-sink, pageviews-sink, session-sink
<-- event-source
Sink: error-event-sink (topic: error-events)
<-- session-processor
Sink: order-sink (topic: orders)
<-- session-processor
Sink: pageviews-sink (topic: pageviews)
<-- session-processor
Sink: session-sink (topic: sessions)
<-- session-processor
Global Stores:
none
(my.package.StreamProcessorApplication$)
[2018-06-08 05:14:01,425] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
[2018-06-08 05:14:01,539] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
[2018-06-08 05:14:01,640] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
      ... (102 lines like that)
[2018-06-08 05:29:05,548] INFO   .... my own application logging here

…因此,如果关闭close()方法中的存储,当process()中的代码尝试使用它们时,会出现一个异常,说明存储已关闭。
为什么在kafkastreams开始时调用processor.close()?为什么这种情况经常发生?
不明确关闭门店有什么风险?

a64a0gku

a64a0gku1#

文档中的示例不正确。您不应该关闭存储--存储由kafka streams管理,kafka streams将为您的用户关闭存储(我将做一个pr来修复代码示例。谢谢你指出。)
关于打电话给 Processor#close() :处理器可能会关闭并重新打开。这发生在重新平衡期间。因此,编写代码的方式必须确保它能够正确地处理对的多个调用 init() 以及 close() --我们最近为此更新了javadocs(改进的javadocs将是kafka2.0版本的一部分)。

相关问题