remote forward()调用processorcontext抛出npe

klh5stk1  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(265)

我使用apachekafka流处理器api。
我创建了一个远程对象,它在处理器的statestore中扩展unicastremote对象。
init(ProcessorContext) 我的拓扑结构的处理器(它扩展了abstractprocessor)的方法称为我在远程对象中放置了processorcontext引用。

public void init(ProcessorContext processorContext){
  super.init(processorContext);
  this.myStore = (MyCustomStore<String, String>)context().getStateStore("mystore");
  this.myStore.getRemoteObject().setProcessorContext(processorContext);
}

如果我只远程访问processorcontext并使用 forward 方法将记录发送到接收器处理器它工作正常。如果我只在本地访问processorcontext来发送记录 forward 它也很好用。
如果我在本地和远程访问processorcontext来使用 forward 我有npe。我使用了一种同步方法 foward 调用是因为我认为两个不同的线程试图同时访问processorcontext,但npe坚持。
在我看来,不知何故,同一个线程在本地和远程同时向前调用,并抛出npe。
信息是:

java.lang.NullPointerException
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:114)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
        at StateMessageBean.sendMessage(StateMessageBean.java:109)
        at StateMessageBean.subroundEnded(StateMessageBean.java:87)
        at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:361)
        at sun.rmi.transport.Transport$1.run(Transport.java:200)
        at sun.rmi.transport.Transport$1.run(Transport.java:197)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
        at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
        at sun.rmi.transport.StreamRemoteCall.exceptionReceivedFromServer(StreamRemoteCall.java:283)
        at sun.rmi.transport.StreamRemoteCall.executeCall(StreamRemoteCall.java:260)
        at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:161)
        at java.rmi.server.RemoteObjectInvocationHandler.invokeRemoteMethod(RemoteObjectInvocationHandler.java:227)
        at java.rmi.server.RemoteObjectInvocationHandler.invoke(RemoteObjectInvocationHandler.java:179)
        at com.sun.proxy.$Proxy25.subroundEnded(Unknown Source)
        at CoordinatorProcessor.collectIncreaseOfC(CoordinatorProcessor.java:332)
        at CoordinatorProcessor.process(CoordinatorProcessor.java:119)
        at CoordinatorProcessor.process(CoordinatorProcessor.java:25)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)

我远程和本地调用的两种方法是:

//remote method
@Override
public void subroundEnded()
{
    PhiMessage newPhiMessage = new PhiMessage();
    newPhiMessage.setSiteId(this.siteId);
    newPhiMessage.setCurrentRound(this.currentRound);
    newPhiMessage.setCurrentSubround(this.currentSubround);
    newPhiMessage.setPhi(this.phi);

    byte[] byteNewPhiMessage = SerializationUtils.serialize(newPhiMessage);
    this.processorContext.forward("PhiMessage",  byteNewPhiMessage);
}

//local method
public  void sendIncrease()
{

  IncreaseMessage increaseMessage = new IncreaseMessage();
  increaseMessage.setCurrentRound(this.currentRound);
  increaseMessage.setCurrentSubround(this.currentSubround);
  increaseMessage.setIncrease(this.increase);
  byte[] byteIncreaseMessage = SerializationUtils.serialize(increaseMessage);
  this.processorContext.forward("IncreaseMessage",  byteIncreaseMessage);

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题