假设我们有一个变压器(用scala编写)
new Transformer[String, V, (String, V)]() {
var context: ProcessorContext = _
override def init(context: ProcessorContext): Unit = {
this.context = context
}
override def transform(key: String, value: V): (String, V) = {
val timestamp = toTimestamp(value)
context.forward(key, value, To.all().withTimestamp(timestamp))
key -> value
}
override def close(): Unit = ()
}
哪里 toTimestamp
只是一个函数,它返回从记录值获取的时间戳。一旦它被执行,就会有一个npe:
Exception in thread "...-6f3693b9-4e8d-4e65-9af6-928884320351-StreamThread-5" java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:110)
at CustomTransformer.transform()
at CustomTransformer.transform()
at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:302)
at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:300)
at
实际上发生的是 ProcessorContextImpl
出现故障:
public <K, V> void forward(final K key, final V value, final To to) {
toInternal.update(to);
if (toInternal.hasTimestamp()) {
recordContext.setTimestamp(toInternal.timestamp());
}
final ProcessorNode previousNode = currentNode();
因为 recordContext
未初始化(只能由kafkastreams在内部完成)。
这是kafka streams 1输出中的后续问题集时间戳
2条答案
按热度按时间b09cbbtk1#
@matthias-j-sax如果在java代码中重用处理器,则会有相同的行为。
pokxtpni2#
如果你和
transformer
,您需要确保Transformer
创建对象的时间TransformerSupplier#get()
被称为(查阅https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata(元数据)在最初的问题中,我以为是关于你的
context
变量导致npe,但现在我意识到这是关于Kafka流内部的。ScalaAPI在2.0.0中有一个bug,可能会导致
Transformer
示例被重用(https://issues.apache.org/jira/browse/kafka-7250). 我想你在打这个虫子。稍微重写一下代码就可以解决问题。请注意,kafka2.0.1和kafka2.1.0包含一个补丁。