npe,同时使用低级kafka流api执行context.forward()

s4n0splo  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(309)

我使用低级别的kafkaapi构建了一个普通的kafkastreamsapi。拓扑是线性的。
p1->p2->p3
在执行context.forward()时,我得到了npe,这里的代码片段:

NAjava.lang.NullPointerException: null
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:178)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)

...

我正在使用Kafka流2.3.0。
我在这里看到一个类似的so问题[1],这个问题基于非常旧的版本。所以,不确定这是不是同一个错误?

编辑

我把更多的信息,保持我所做的要点

public class SP1Processor implements StreamProcessor {

private StreamProcessingContext ctxt;

// In init(), create a single thread pool
// which does some processing and sends the
// data to next processor
@Override
void init(StreamProcessingContext ctxt) {

      this.ctxt = ctxt;

     // Create a thread pool, do some work
     // and then do this.ctxt.forward(K,V)

    // Not showing code of Thread pool
    // Strangely, inside this thread pool,
    // this.ctxt isn't same what I see in process()
    // shouldn't it be same? ctxt is member variable
    // and shouldn't it be same
    // this.ctxt.forward(K,V) here in this thread pool is causing NPE
    // why does it happen?
    this.ctxt.forward(K,V);

}

@Override
void process(K,V) {

   // Here do some processing and go to the next processor chain
   // This works fine
   this.ctxt.forward(K,V);
}

}

  [1]: https://stackoverflow.com/questions/39067846/periodic-npe-in-kafka-streams-processor-context
0kjbasz6

0kjbasz61#

看起来它可能和相关的问题是同一个问题,尽管我们在讨论一个更现代的版本。确保 ProcessorSupplier.get() 每次调用时返回一个新示例。

3gtaxfhh

3gtaxfhh2#

您不应该在处理器或dsl调用中创建任何线程池。并行性在kafkastreams中由 num.stream.threads ,分区数和示例数。 ctxt 相同,但其字段/成员可能不同(例如。 currentNode )-可能会被不同的线程更改。

相关问题