我有一个简单的模式如下所示
Pattern<Event,?> pattern = Pattern.<Event>begin("s1")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
Long time = System.nanoTime();
// here we are setting the time when this event is detected
event.setEdtl(time);
return event.getSensor_id() == 1 && event.getValue() > 150;
}
}).followedBy("s2")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
Long time = System.nanoTime();
// here we are setting the time when this event is detected
event.setEdtl(time);
return event.getSensor_id() == 2 && event.getValue() > 15;
}
}).followedBy("s3")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
Long time = System.nanoTime();
// here we are setting the time when this event is detected
event.setEdtl(time);
return event.getSensor_id() == 3 && event.getValue() > 35;
}
})
.within(Time.milliseconds(WindowLength_join__ms));
为了找到cep检测时间的延迟,添加了在如上所示的模式中选择每个事件的时间。每个事件类都有一个参数 Edtl
(事件检测时间本地),最初设置为0,之后设置为 System.nanoTime();
我在执行时遇到以下错误,但问题是这个错误是在程序运行一段时间后出现的
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark:
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.cep.operator.KeyedCEPPatternOperator.emitMatchedSequences(KeyedCEPPatternOperator.java:77)
at org.apache.flink.cep.operator.KeyedCEPPatternOperator.processEvent(KeyedCEPPatternOperator.java:58)
at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:236)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
... 18 more
Caused by: java.io.IOException: Failed to send message 'patient_id=1, egtl_raw=null, edtg=null
' to socket server at localhost:6020. Connection re-tries are not enabled.
at org.apache.flink.streaming.api.functions.sink.SocketClientSink.invoke(SocketClientSink.java:154)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
... 26 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:143)
at org.apache.flink.streaming.api.functions.sink.SocketClientSink.invoke(SocketClientSink.java:146)
我想我的模式,这是因为我做的读写操作内的模式。如果是这样,那么我应该如何在flink cep中找到平均复杂事件延迟。
1条答案
按热度按时间ulmd4ohb1#
我已经想出了解决办法,这可能不是最好的,但它是有效的。找到上图中的[ce]复杂事件延迟,我们需要找到检测到ce的时间和最小原始事件时间,然后找出这些时间之间的差异。我在这里使用minimum的原因是假设有3个流
a, b & c
都包含在复杂的事件中,所以无论谁最需要等待,我们都会考虑它的时间。在我看来,它可以从模式中选择。任务1:将时间戳分配给第一个模式,如下所示
任务2:为复杂事件流分配时间戳并查找延迟。具体做法如下
希望有帮助?如果这个问题还有别的解决办法,请告诉我。