kafka流连接 predicate

2hh7jdfx  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(417)

我对java有点陌生,所以我希望得到一些关于处理kafka predicate 中多个条件的建议。我有下面的代码,我能够有基于动态输入的动态过滤器,并避免下面的'如果/elseifs'?。我保持代码简单和愚蠢,以便更容易理解我要做什么。我试着理解,作为用户输入的结果,应用/附加多少过滤器的推荐方法是什么。我还想知道是否有可能有基于用户输入的比较运算符(等于/包含/…)。

public Topology buildTopology(Properties envProps) {
        final StreamsBuilder builder = new StreamsBuilder();
        final String inputTopic = envProps.getProperty("input.topic.name");
        final String streamsOutputTopic = envProps.getProperty("streams.approved.topic.name");
        final String tableOutputTopic = envProps.getProperty("table.output.topic.name");
        final Serde<String> stringSerde = Serdes.String();
        final KStream<String, ClientTask> stream = builder.stream(inputTopic, Consumed.with(stringSerde,StreamsSerdes.ClientTask()));    
        String[] Filters = {"State=Work In Progress","Priorit=High"};
        final KStream<String, ClientTask> filter_stream = stream.filter(IsGoodtoGoFilter(Filters));
        filter_stream.to(streamsOutputTopic, Produced.with(stringSerde, StreamsSerdes.ClientTask()));           
        return builder.build();
    } 
    public static Predicate<String, ClientTask> IsGoodtoGoFilter(String[] Filters) {
        ArrayList<String> GetColumn = new ArrayList<>();
        ArrayList<String> GetValue = new ArrayList<>();
        for (String FilterColumn: Filters)       {
            String[] ColumnAndValue = FilterColumn.split("=");
            GetColumn.add(ColumnAndValue[0]);
            GetValue.add(ColumnAndValue[1]);
        }
        if (GetColumn.size() == 1) {
            return (k, v) -> v.get(GetColumn.get(0)).toString().equals(GetValue.get(0));
        }
        else if (GetColumn.size() == 2) {
            return (k, v) -> v.get(GetColumn.get(0)).toString().equals(GetValue.get(0)) &&
            v.get(GetColumn.get(1)).toString().equals(GetValue.get(1));
        }
        return (k, v) -> v.getStatus().equals("Open"); // replace by empty predicate later
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题