尝试访问或读取apache flink中keyedbroadcastprocessfunction中processelement方法中的只读ctx时出现nullpointer异常

b1zrtrql  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(518)

我有一个有趣的场景,我正在flink中进行模式匹配,使用keyedbroadcastprocessfunction评估传入的模式,当我在ide中运行程序时,当我尝试访问readonlycontext时,processelements函数中出现空指针异常,但在terminal中运行良好,下面是我的keyedbroadcastprocessfunction
下面是我的主要课程

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.json.JSONObject;

public class SignalPatternMatchingApp {

    public static final MapStateDescriptor<String, Map<String, String>> patternRuleDescriptor =
            new MapStateDescriptor(SignalPatternMatchingConstants.PATTERN_RULE_DESCRIPTOR_NAME,
                    BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(String.class, String.class));

    public static final OutputTag<JSONObject> unMatchedSideOutput =
            new OutputTag<JSONObject>("sideoutput") {
            };

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<JSONObject> inputSignal = get input from kafka stream

        DataStream<Map<String, String>> rawPatternStream =
                env.fromElements(get data from database);

        DataStream<Tuple2<String, Map<String, String>>> patternStream =
                rawPatternStream.flatMap(new FlatMapFunction<Map<String, String>,
                        Tuple2<String, Map<String, String>>>() {
                    @Override
                    public void flatMap(Map<String, String> patternRules,
                                        Collector<Tuple2<String, Map<String, String>>> out) throws Exception {
                        for (Map.Entry<String, String> stringEntry : patternRules.entrySet()) {
                            JSONObject jsonObject = new JSONObject(stringEntry.getValue());
                            Map<String, String> map = new HashMap<>();
                            for (String key : jsonObject.keySet()) {
                                String value = jsonObject.get(key).toString();
                                map.put(key, value);
                            }
                            out.collect(new Tuple2<>(stringEntry.getKey(), map));
                        }
                    }
                });

        BroadcastStream<Tuple2<String, Map<String, String>>> patternBroadcast =
                patternStream.broadcast(patternRuleDescriptor);

        DataStream<Tuple2<String, JSONObject>> matchedSignal =
                inputSignal.map(new MapFunction<JSONObject, Tuple2<String, JSONObject>>() {
            @Override
            public Tuple2<String, JSONObject> map(JSONObject inputSignal) throws Exception {
                String sourceName = inputSignal.getJSONObject("signalHeader").get("sourceName").toString();
                return new Tuple2<>(sourceName, inputSignal);
            }
        }).keyBy(0).connect(patternBroadcast).process(new TestProcess());

        matchedSignal.print();

        DataStream<JSONObject> unmatchedSignal =
                ((SingleOutputStreamOperator<Tuple2<String, JSONObject>>) matchedSignal)
                .getSideOutput(unMatchedSideOutput);

        unmatchedSignal.print();

        env.execute();

    }

KeyedBroadcastProcessFunction as below

 public class TestProcess extends KeyedBroadcastProcessFunction<String, Tuple2<String, sampleSignal>,
            Tuple2<String, Map<String, String>>, Tuple2<String, sampleSignal>> {

   MapStateDescriptor<String, Map<String, String>> patternRuleDesc;

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        patternRuleDesc = new MapStateDescriptor<>("RuleDescriptor",
                BasicTypeInfo.STRING_TYPE_INFO,new MapTypeInfo<>(String.class,String.class));
    }

        public static final MapStateDescriptor <String,Map<String,String>> ruleDescriptor =
                new MapStateDescriptor <>("RuleDiscriptor",
                        ,BasicTypeInfo.STRING_TYPE_INFO
                        ,new MapTypeInfo<>(String.class,String.class));

        @Override
        public void processElement(Tuple2<String, sampleSignal> value, ReadOnlyContext ctx, Collector<Tuple2<String,
                sampleSignal>> out) throws Exception {

            Map<String,String> patternConditions  = ctx.getBroadcastState(this.patternRuleDesc).get(Key);

            System.out.println("Before Rule Iterator");

            /*I tried below way to print the values in broadcaststream just to print the values
              in broadcast state it don't print anything*/

            for(Map.Entry<String, String> rules:
                    patternConditions.entrySet()){
                System.out.println("Key: " +rules.getKey());
                System.out.println("Value: "+rules.getValue());
            }

            out.collect(new Tuple2<>(value.f0,value.f1));

        }

        @Override
        public void processBroadcastElement(Tuple2<String, Map<String, String>> value, Context ctx,
                                            Collector<Tuple2<String, sampleSignal>> out) throws Exception {

            System.out.println("BroadCastState Key: " +value.f0);
            System.out.println("BroadCastState Value: " +value.f1);
            ctx.getBroadcastState(ruleDescriptor).put(value.f0,value.f1);

        }
    }

下面是ide终端输出,有错误异常

2020-07-07 12:15:19,349 INFO  [Task] - Ensuring all FileSystem streams are closed for task Flat Map (1/8) (1a117aa5347465fcc2cd5e58c286ccca) [FINISHED]
2020-07-07 12:15:19,348 INFO  [TestProcess ] - BroadCastState SourceName: A
2020-07-07 12:15:19,350 INFO  [TestProcess ] - BroadCastState PatternCondition: PatternRule
2020-07-07 12:15:19,341 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Collection Source (1/1) 23f6a4d19c5744a62c46ac48d4dfbb24.
2020-07-07 12:15:19,351 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) (773fad58ff1418ae919a0900e4da6ef5) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,351 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) (773fad58ff1418ae919a0900e4da6ef5).
2020-07-07 12:15:19,351 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) (67a21177a7598f3f1ccc5001fe6951c3) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,351 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) (67a21177a7598f3f1ccc5001fe6951c3).
2020-07-07 12:15:19,351 INFO  [TestProcess ] - BroadCastState SourceName: A
2020-07-07 12:15:19,351 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) (67a21177a7598f3f1ccc5001fe6951c3) [FINISHED]
2020-07-07 12:15:19,351 INFO  [TestProcess ] - BroadCastState PatternCondition: PatternRule
2020-07-07 12:15:19,351 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) (773fad58ff1418ae919a0900e4da6ef5) [FINISHED]
2020-07-07 12:15:19,352 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) (8d860a734d5b6a50194a5caad135a844) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,352 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) (8d860a734d5b6a50194a5caad135a844).
2020-07-07 12:15:19,352 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) (8d860a734d5b6a50194a5caad135a844) [FINISHED]
2020-07-07 12:15:19,352 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) (8fa9c71df79d439827a1d290d9ad9abd) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,352 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) (8fa9c71df79d439827a1d290d9ad9abd).
2020-07-07 12:15:19,352 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) (8fa9c71df79d439827a1d290d9ad9abd) [FINISHED]
2020-07-07 12:15:19,357 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) (ee3c4e895e82f77ad9a055ee788f4a2b) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,357 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) (ee3c4e895e82f77ad9a055ee788f4a2b).
2020-07-07 12:15:19,357 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) (ee3c4e895e82f77ad9a055ee788f4a2b) [FINISHED]
2020-07-07 12:15:19,358 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) (6a75da55421a929d4b3d4ebd655414e9) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,358 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) (6a75da55421a929d4b3d4ebd655414e9).
2020-07-07 12:15:19,339 INFO  [ExecutionGraph] - Flat Map (3/8) (63a7c3f2b7a8d110232374c700e6378a) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,352 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) (351cc238fecce28d1dfdc5ac357ef8e4) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,352 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Map (3/8) d94aea75380e0e32c4747eef2f51a88d.
2020-07-07 12:15:19,358 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) (351cc238fecce28d1dfdc5ac357ef8e4).
2020-07-07 12:15:19,359 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) (351cc238fecce28d1dfdc5ac357ef8e4) [FINISHED]
2020-07-07 12:15:19,360 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Flat Map (2/8) 32f2429b06954884543a4de062edf6f6.
2020-07-07 12:15:19,358 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) (6a75da55421a929d4b3d4ebd655414e9) [FINISHED]
2020-07-07 12:15:19,360 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Map (1/8) 7dd6a325fe2265187b0b30bf3b4b8f63.
2020-07-07 12:15:19,358 INFO  [ExecutionGraph] - Flat Map (4/8) (de310509f095e00584ce128336e19adf) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,361 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Map (5/8) 4920b55f28ea09128aaa4e0d9d4691d8.
2020-07-07 12:15:19,362 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Flat Map (8/8) 0fded4a7816a9d9a219c520891d2b38d.
2020-07-07 12:15:19,362 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Flat Map (1/8) 1a117aa5347465fcc2cd5e58c286ccca.
2020-07-07 12:15:19,363 INFO  [ExecutionGraph] - Map (7/8) (ac26eefbff14e55f031a055c1d6fa7f3) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,364 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) 67a21177a7598f3f1ccc5001fe6951c3.
2020-07-07 12:15:19,367 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) 773fad58ff1418ae919a0900e4da6ef5.
2020-07-07 12:15:19,367 INFO  [ExecutionGraph] - Flat Map (5/8) (3bb74ec008b43ffe86edd6a7f84844a0) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,368 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) 8d860a734d5b6a50194a5caad135a844.
2020-07-07 12:15:19,368 INFO  [ExecutionGraph] - Map (2/8) (90787449e2373696163da5670b0e543a) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,369 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) 8fa9c71df79d439827a1d290d9ad9abd.
2020-07-07 12:15:19,370 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) ee3c4e895e82f77ad9a055ee788f4a2b.
2020-07-07 12:15:19,373 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) 351cc238fecce28d1dfdc5ac357ef8e4.
2020-07-07 12:15:19,374 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) 6a75da55421a929d4b3d4ebd655414e9.
2020-07-07 12:15:19,369 INFO  [ExecutionGraph] - Flat Map (7/8) (c1ca47240e15ef6c60f1943aed1b45ba) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,379 INFO  [ExecutionGraph] - Source: Collection Source (1/1) (23f6a4d19c5744a62c46ac48d4dfbb24) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,381 INFO  [ExecutionGraph] - Map (3/8) (d94aea75380e0e32c4747eef2f51a88d) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,382 INFO  [ExecutionGraph] - Flat Map (2/8) (32f2429b06954884543a4de062edf6f6) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,384 INFO  [ExecutionGraph] - Map (1/8) (7dd6a325fe2265187b0b30bf3b4b8f63) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,385 INFO  [ExecutionGraph] - Map (5/8) (4920b55f28ea09128aaa4e0d9d4691d8) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,386 INFO  [TypeExtractor] - class org.json.JSONObject does not contain a getter for field map
2020-07-07 12:15:19,386 INFO  [TypeExtractor] - class org.json.JSONObject does not contain a setter for field map
2020-07-07 12:15:19,386 INFO  [ExecutionGraph] - Flat Map (8/8) (0fded4a7816a9d9a219c520891d2b38d) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,386 INFO  [TypeExtractor] - Class class org.json.JSONObject cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2020-07-07 12:15:19,388 INFO  [ExecutionGraph] - Flat Map (1/8) (1a117aa5347465fcc2cd5e58c286ccca) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,390 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) (67a21177a7598f3f1ccc5001fe6951c3) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,390 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) (33a27b2d97c96658fd43db24177236bc) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
    at com.eventdetection.eventfilter.pattern.operator.json.filter.TestProcess .processElement(TestProcess .java:103)
    at com.eventdetection.eventfilter.pattern.operator.json.filter.TestProcess .processElement(TestProcess .java:40)
    at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:135)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:100)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
2020-07-07 12:15:19,394 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) (33a27b2d97c96658fd43db24177236bc).
2020-07-07 12:15:19,394 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) (773fad58ff1418ae919a0900e4da6ef5) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,394 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) (33a27b2d97c96658fd43db24177236bc) [FAILED]
2020-07-07 12:15:19,395 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) (8d860a734d5b6a50194a5caad135a844) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,396 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) (8fa9c71df79d439827a1d290d9ad9abd) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,397 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) (ee3c4e895e82f77ad9a055ee788f4a2b) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,398 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) (351cc238fecce28d1dfdc5ac357ef8e4) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,399 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) (6a75da55421a929d4b3d4ebd655414e9) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,402 INFO  [TaskExecutor] - Un-registering task and sending final execution state FAILED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) 33a27b2d97c96658fd43db24177236bc.
2020-07-07 12:15:19,404 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) (33a27b2d97c96658fd43db24177236bc) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
    at com.eventdetection.eventfilter.pattern.operator.json.filter.PatternFilter.processElement(PatternFilter.java:103)
    at com.eventdetection.eventfilter.pattern.operator.json.filter.PatternFilter.processElement(PatternFilter.java:40)
    at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:135)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:100)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
2020-07-07 12:15:19,406 INFO  [RestartPipelinedRegionStrategy] - Calculating tasks to restart to recover the failed task d8804397962a5c1c0b4daacb1802fb97_4.
2020-07-07 12:15:19,408 INFO  [RestartPipelinedRegionStrategy] - 26 tasks should be restarted to recover the failed task d8804397962a5c1c0b4daacb1802fb97_4. 
2020-07-07 12:15:19,410 INFO  [ExecutionGraph] - Job Pattern-Matching (1a828d53bc6a886fe0fc7c454e6e66b7) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

请帮助解决这个问题,我使用的是flink 1.10.0版本和intellij ide以及java 1.8版本

mrzz3bfm

mrzz3bfm1#

假设代码的当前状态是这一部分,那么它实际上只能失败:

Map<String,String> patternConditions  = ctx.getBroadcastState(this.patternRuleDesc).get(Key);

            System.out.println("Before Rule Iterator");

            /*I tried below way to print the values in broadcaststream just to print the values
              in broadcast state it don't print anything*/

            for(Map.Entry<String, String> rules:
                    patternConditions.entrySet()){
                System.out.println("Key: " +rules.getKey());
                System.out.println("Value: "+rules.getValue());
            }

问题不是真正获得广播状态,因为它不应该抛出 NullPointerException ,但是 IllegalArgumentException . 问题是你得到了一些 Key 然后试着对这个做些手术( .entryset() ). 但是如果 Key 不存在的状态将正常工作 Map 意味着它会回来 null . 如果你想给我做手术 null 你会得到 NullPointerException .
您应该添加一些代码来验证给定的 Key 否则它总是会失败。
编辑:
因此,如果问题是关于broadcaststream和另一个流之间的竞争,那么这不是您可以轻易阻止的。这是因为消费者消费信息的速度可能不同,通常不可能防止这种情况发生。有两种方法可以解决这个问题:
创建 ListState 这将保留到达但没有相应的 Key ,那么你只需在任何时候发出它们 Key 到达。注意:必须从 processElementBroadcastState 未设置关键帧。
使用 InputSelectable 要编写一个操作符,使一个源优先于另一个源,这种方式可以使广播流优先于元素流。
一般来说,我想说的是,如果你有连续的数据流,即你知道 Key 会不止一次到达。方案2提供了更大的灵活性,但也需要更多的知识,通常更难正确实施。

相关问题