我使用apachekafka流处理器api。
我创建了一个远程对象,它在处理器的statestore中扩展unicastremote对象。
当 init(ProcessorContext)
我的拓扑结构的处理器(它扩展了abstractprocessor)的方法称为我在远程对象中放置了processorcontext引用。
public void init(ProcessorContext processorContext){
super.init(processorContext);
this.myStore = (MyCustomStore<String, String>)context().getStateStore("mystore");
this.myStore.getRemoteObject().setProcessorContext(processorContext);
}
如果我只远程访问processorcontext并使用 forward
方法将记录发送到接收器处理器它工作正常。如果我只在本地访问processorcontext来发送记录 forward
它也很好用。
如果我在本地和远程访问processorcontext来使用 forward
我有npe。我使用了一种同步方法 foward
调用是因为我认为两个不同的线程试图同时访问processorcontext,但npe坚持。
在我看来,不知何故,同一个线程在本地和远程同时向前调用,并抛出npe。
信息是:
java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:114)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at StateMessageBean.sendMessage(StateMessageBean.java:109)
at StateMessageBean.subroundEnded(StateMessageBean.java:87)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:361)
at sun.rmi.transport.Transport$1.run(Transport.java:200)
at sun.rmi.transport.Transport$1.run(Transport.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at sun.rmi.transport.StreamRemoteCall.exceptionReceivedFromServer(StreamRemoteCall.java:283)
at sun.rmi.transport.StreamRemoteCall.executeCall(StreamRemoteCall.java:260)
at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:161)
at java.rmi.server.RemoteObjectInvocationHandler.invokeRemoteMethod(RemoteObjectInvocationHandler.java:227)
at java.rmi.server.RemoteObjectInvocationHandler.invoke(RemoteObjectInvocationHandler.java:179)
at com.sun.proxy.$Proxy25.subroundEnded(Unknown Source)
at CoordinatorProcessor.collectIncreaseOfC(CoordinatorProcessor.java:332)
at CoordinatorProcessor.process(CoordinatorProcessor.java:119)
at CoordinatorProcessor.process(CoordinatorProcessor.java:25)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
我远程和本地调用的两种方法是:
//remote method
@Override
public void subroundEnded()
{
PhiMessage newPhiMessage = new PhiMessage();
newPhiMessage.setSiteId(this.siteId);
newPhiMessage.setCurrentRound(this.currentRound);
newPhiMessage.setCurrentSubround(this.currentSubround);
newPhiMessage.setPhi(this.phi);
byte[] byteNewPhiMessage = SerializationUtils.serialize(newPhiMessage);
this.processorContext.forward("PhiMessage", byteNewPhiMessage);
}
//local method
public void sendIncrease()
{
IncreaseMessage increaseMessage = new IncreaseMessage();
increaseMessage.setCurrentRound(this.currentRound);
increaseMessage.setCurrentSubround(this.currentSubround);
increaseMessage.setIncrease(this.increase);
byte[] byteIncreaseMessage = SerializationUtils.serialize(increaseMessage);
this.processorContext.forward("IncreaseMessage", byteIncreaseMessage);
}
暂无答案!
目前还没有任何答案,快来回答吧!