flink复杂事件处理

uemypmqf  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(360)

我有一个flink cep代码,它从socket读取并检测模式。假设模式(单词)是“alert”。如果单词alert出现五次或五次以上,则应创建一个警报。但我得到一个输入不匹配的错误。flink版本是1.3.0。提前谢谢!!

package pattern;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;

    public class cep {

         public static void main(String[] args) throws Exception {

             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);

                dss.print();

                Pattern<String,String> pattern = Pattern.<String> begin("first")
                        .where(new IterativeCondition<String>() {
                            @Override
                            public boolean filter(String word, Context<String> context) throws Exception {
                                return word.equals("alert");
                            }
                        })
                        .times(5);

                PatternStream<String> patternstream = CEP.pattern(dss, pattern);

                DataStream<String> alerts = patternstream
                        .flatSelect((Map<String,List<String>> in, Collector<String> out) -> {

                            String first = in.get("first").get(0);

                            for (int i = 0; i < 6; i++ ) {

                                out.collect(first);

                            }

                        });

                alerts.print();

                env.execute();

            }

    }

jtoj6r0c

jtoj6r0c1#

只是对原来的问题作了一些澄清。在1.3.0中,有一个bug使用lambdas作为 select/flatSelect 不可能的。
它是在1.3.1中修复的,因此您的第一个版本的代码将与1.3.1一起使用。
另外我觉得你误解了 times 量词。它与精确的次数匹配。所以在您的例子中,只有当事件正好匹配3次时,它才会返回,而不是3次或更多。

tjjdgumg

tjjdgumg2#

所以我有密码要用。这是可行的解决方案,

package pattern;

    import org.apache.flink.cep.CEP;
    import org.apache.flink.cep.PatternSelectFunction;
    import org.apache.flink.cep.PatternStream;
    import org.apache.flink.cep.pattern.Pattern;
    import org.apache.flink.cep.pattern.conditions.IterativeCondition;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;

    import java.util.List;
    import java.util.Map;

    public class cep {

         public static void main(String[] args) throws Exception {

             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);

                dss.print();

                Pattern<String,String> pattern = Pattern.<String> begin("first")
                        .where(new IterativeCondition<String>() {
                            @Override
                            public boolean filter(String word, Context<String> context) throws Exception {
                                return word.equals("alert");
                            }
                        })
                        .times(5);

                PatternStream<String> patternstream = CEP.pattern(dss, pattern);

                DataStream<String> alerts = patternstream
                        .select(new PatternSelectFunction<String, String>() {
                            @Override
                            public String select(Map<String, List<String>> in) throws Exception {

                                String first = in.get("first").get(0);

                                if(first.equals("alert")){

                                    return ("5 or more alerts");
                                }
                                else{

                                    return (" ");
                                }
                            }
                        });

                alerts.print();

                env.execute();

            }

    }

相关问题