我试图根据一组规则来验证数据流,以检测flink中的模式,方法是使用一组规则来验证数据流,我使用for循环来收集map中的所有模式,并在processelement fn中对其进行迭代,以找到模式示例代码,如下所示
mapstate描述符和边输出流如下
public static final MapStateDescriptor<String, String> ruleSetDescriptor =
new MapStateDescriptor<String, String>("RuleSet", BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
public final static OutputTag<Tuple2<String, String>> unMatchedSideOutput =
new OutputTag<Tuple2<String, String>>(
"unmatched-side-output") {
};
处理功能和广播功能如下:
@Override
public void processElement(Tuple2<String, String> inputValue, ReadOnlyContext ctx,
Collector<Tuple2<String,
String>> out) throws Exception {
for (Map.Entry<String, String> ruleSet:
ctx.getBroadcastState(broadcast.patternRuleDescriptor).immutableEntries()) {
String ruleName = ruleSet.getKey();
//If the rule in ruleset is matched then send output to main stream and break the program
if (this.rule) {
out.collect(new Tuple2<>(inputValue.f0, inputValue.f1));
break;
}
}
// Writing output to sideout if no rule is matched
ctx.output(Output.unMatchedSideOutput, new Tuple2<>("No Rule Detected", inputValue.f1));
}
@Override
public void processBroadcastElement(Tuple2<String, String> ruleSetConditions, Context ctx, Collector<Tuple2<String,String>> out) throws Exception {
ctx.getBroadcastState(broadcast.ruleSetDescriptor).put(ruleSetConditions.f0,
ruleSetConditions.f1);
}
我能够检测到模式,但我也得到sideoutput,因为我试图一个接一个地迭代规则如果我的匹配规则出现在last中,程序将输出发送到sideoutput,因为初始规则集不匹配。我想打印侧输出只有一次,如果没有一个规则是满意的,我是新来Flink请帮助我如何才能实现它。
1条答案
按热度按时间nom7f22z1#
在我看来,你想做些更像这样的事情: