如何更新flink中keyedbroadcastprocessfunction中的广播状态?

fzwojiic  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(923)

我是flink的新手,我正在使用ApacheFlink进行模式匹配,其中模式列表以广播状态显示,并通过processelements函数中的模式进行迭代以找到匹配的模式,我正在从数据库及其准时活动中读取此模式。下面是我的代码
mapstate描述符和边输出流如下

public static final MapStateDescriptor<String, String> ruleDescriptor=
        new MapStateDescriptor<String, String>("RuleSet", BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);

public final static OutputTag<Tuple2<String, String>> unMatchedSideOutput =
        new OutputTag<Tuple2<String, String>>(
                "unmatched-side-output") {
        };

处理功能和广播功能如下:

@Override
public void processElement(Tuple2<String, String> inputValue, ReadOnlyContext ctx,Collector<Tuple2<String,String>> out) throws Exception {

for (Map.Entry<String, String> ruleSet:                ctx.getBroadcastState(broadcast.patternRuleDescriptor).immutableEntries()) {

String ruleName = ruleSet.getKey();

//If the rule in ruleset is matched then send output to main stream and break the program
if (this.rule) {
out.collect(new Tuple2<>(inputValue.f0, inputValue.f1));
break;
}
}

// Writing output to sideout if no rule is matched 
ctx.output(Output.unMatchedSideOutput, new Tuple2<>("No Rule Detected", inputValue.f1));
}

@Override
public void processBroadcastElement(Tuple2<String, String> ruleSetConditions, Context ctx, Collector<Tuple2<String,String>> out) throws Exception {            ctx.getBroadcastState(broadcast.ruleDescriptor).put(ruleSetConditions.f0,
                    ruleSetConditions.f1);
}

主要功能如下

public static void main(String[] args) throws Exception {

        //Initiate a datastream environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //Reads incoming data for upstream
        DataStream<String> incomingSignal =
                env.readTextFile(....);

        //Reads the patterns available in configuration file
        DataStream<String> ruleStream =
                env.readTextFile();

        //Generate a key,value pair of set of patterns where key is pattern name and value is pattern condition
        DataStream<Tuple2<String, String>> ruleStream =
                rawPatternStream.flatMap(new FlatMapFunction<String, Tuple2<String, String>>() {
            @Override
            public void flatMap(String ruleCondition, Collector<Tuple2<String, String>> out) throws Exception {

                    String rules[] = ruleCondition.split[","];
                    out.collect(new Tuple2<>(rules[0], rules[1]));
                }
            }
        });

        //Broadcast the patterns to all the flink operators which will be stored in flink operator memory
        BroadcastStream<Tuple2<String, String>>ruleBroadcast = ruleStream.broadcast(ruleDescriptor);

        /*Creating keystream based on sourceName as key */
        DataStream<Tuple2<String, String>> matchSignal =
                incomingSignal.map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String incomingSignal) throws Exception {
                        String sourceName = ingressSignal.split[","][0]

                        return new Tuple2<>(sourceName, incomingSignal);
                    }
                }).keyBy(0).connect(ruleBroadcast).process(new KeyedBroadCastProcessFunction());

        matchSignal.print("RuleDetected=>");
}

我有几个问题
1) 目前我正在从数据库中读取规则,当flink作业在集群中运行时,如何更新广播状态?如果我从kafka主题中获得新的规则集,如何在进程中更新广播状态keyedbroadcasrprocessfunction中的broadcast方法2)当广播状态更新时,我们需要重新启动flink作业吗?
请帮我回答以上问题

osh3o9ms

osh3o9ms1#

设置或更新广播状态的唯一方法是 processBroadcastElement a方法 BroadcastProcessFunction 或者 KeyedBroadcastProcessFunction . 您所需要做的只是使应用程序适应来自流源的规则流,而不是从文件中读取一次规则。
广播状态是哈希Map。如果您的广播流包含一个新的密钥/值对,该密钥/值对使用与先前广播事件相同的密钥,那么新的值将替换旧的密钥/值对。否则你会得到一个全新的条目。
如果将readfile与 FileProcessingMode.PROCESS_CONTINUOUSLY ,则每次修改文件时,都会重新引用其全部内容。您可以使用该机制来更新规则集。

相关问题