查找每个元组的flink cep检测延迟

g2ieeal7  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(316)

我有一个简单的模式如下所示

Pattern<Event,?> pattern = Pattern.<Event>begin("s1")
                    .where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event event) throws Exception {

                            Long time = System.nanoTime();
                            // here we are setting the time when this event is detected
                            event.setEdtl(time);

                            return event.getSensor_id() == 1 && event.getValue() > 150;
                        }
                    }).followedBy("s2")
                    .where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event event) throws Exception {
                            Long time = System.nanoTime();

                            // here we are setting the time when this event is detected
                            event.setEdtl(time);

                            return event.getSensor_id() == 2 && event.getValue() > 15;
                        }
                    }).followedBy("s3")
                    .where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event event) throws Exception {

                            Long time = System.nanoTime();
                            // here we are setting the time when this event is detected
                            event.setEdtl(time);
                            return event.getSensor_id() == 3 && event.getValue() > 35;
                        }
                    })
                    .within(Time.milliseconds(WindowLength_join__ms));

为了找到cep检测时间的延迟,添加了在如上所示的模式中选择每个事件的时间。每个事件类都有一个参数 Edtl (事件检测时间本地),最初设置为0,之后设置为 System.nanoTime(); 我在执行时遇到以下错误,但问题是这个错误是在程序运行一段时间后出现的

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.cep.operator.KeyedCEPPatternOperator.emitMatchedSequences(KeyedCEPPatternOperator.java:77)
    at org.apache.flink.cep.operator.KeyedCEPPatternOperator.processEvent(KeyedCEPPatternOperator.java:58)
    at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:236)
    at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
    ... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    ... 18 more
Caused by: java.io.IOException: Failed to send message 'patient_id=1, egtl_raw=null, edtg=null
' to socket server at localhost:6020. Connection re-tries are not enabled.
    at org.apache.flink.streaming.api.functions.sink.SocketClientSink.invoke(SocketClientSink.java:154)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    ... 26 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:143)
    at org.apache.flink.streaming.api.functions.sink.SocketClientSink.invoke(SocketClientSink.java:146)

我想我的模式,这是因为我做的读写操作内的模式。如果是这样,那么我应该如何在flink cep中找到平均复杂事件延迟。

ulmd4ohb

ulmd4ohb1#


我已经想出了解决办法,这可能不是最好的,但它是有效的。找到上图中的[ce]复杂事件延迟,我们需要找到检测到ce的时间和最小原始事件时间,然后找出这些时间之间的差异。我在这里使用minimum的原因是假设有3个流 a, b & c 都包含在复杂的事件中,所以无论谁最需要等待,我们都会考虑它的时间。在我看来,它可以从模式中选择。
任务1:将时间戳分配给第一个模式,如下所示

if(perform_cep ){

        // performing some cep on merged stream

        Pattern<Event,?> pattern = Pattern.<Event>begin("s1")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {

                        // here we are setting the time when this event is detected

                        if(event.getSensor_id() == 1 && event.getValue() > 150){
                            Long time = System.currentTimeMillis();
                            event.setEdtl(time);
                            return true;

                        }
                        else return false;

                    }
                }).followedBy("s2")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {

                        return  event.getSensor_id() == 2 && event.getValue() > 15 ;

                    }
                }).followedBy("s3")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {

                       return  event.getSensor_id() == 3 && event.getValue() > 35;

                    }
                })
                .within(Time.milliseconds(WindowLength_join__ms));

任务2:为复杂事件流分配时间戳并查找延迟。具体做法如下

PatternStream<Event> patternStream = CEP.pattern(mergedStream,pattern);

            DataStream<String> cep_stream = patternStream.select(new PatternSelectFunction<Event, String>() {

  @Override
            public String select(Map<String, List<Event>> map) throws Exception {

                Event s1 = map.get("s1").get(0);

                Integer patient_id = s1.getPatient_id();
                Integer val1 = s1.getValue();
                Long time_s1 = s1.getEdtl();

                Event s2 = map.get("s2").get(0);
                Integer val2 = s2.getValue();

                Event s3 = map.get("s3").get(0);
                Integer val3 = s3.getValue();

                System.out.println("value 1  = " + val1);
                System.out.println("value 2  = " + val2);
                System.out.println("value 3  = " + val3);

                Long current_time = System.currentTimeMillis();
                Long cep_latency = current_time - time_s1 ;
                System.out.println("cep_latency = " + cep_latency + "ms" );

                String event_data =  "patient_id=" + patient_id +
                               ", cep_latency=" + cep_latency ;

                return event_data+ "\n";

            }
        });

希望有帮助?如果这个问题还有别的解决办法,请告诉我。

相关问题