SparkJava-流数据的条件逻辑-需要批评

nxagd54h  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(329)

我有一个需要有条件地处理我的数据取决于什么是作为输入接收。该应用程序是一个spark(2.3.4)结构化流媒体应用程序,读取Kafka源代码(2.3.0)。我可以成功地读取数据,获取数据,解析数据等等。
根据消息包含的数据,我需要进行进一步的处理。我有一个工作方法在下面的代码中列出,需要一个关键的评估,看看这是最好的方法还是另一个更好的方法是可用的。
工作方法如下。基于这个消息,我需要做更多的转换,并将各种转换后的输出保存到数据库中,最后以csv或json格式提供答案。

//raw streaming data from kafka here
Dataset<String> values = dsRawData
                    .selectExpr("CAST(value AS STRING)")
                    .as(Encoders.STRING());

//conditional processing check here
Dataset<String> csvIn = values 
                    .map((MapFunction<String, String>) se -> {
                        String[] controls = se.split(",");
                        secoreLog.info(logHeader+"controls: " + controls[0]);

                        if(controls[0].equals("magic1") && controls[1].equals("magic2") &&
                                controls[2].equals("magic2") && controls[3].equals("magic1")){
                            //trigger transformations & writes
                        }
                        else {
                            //trigger a different set of transformations & writes
                        }

                        return controls.toString();
                    }, Encoders.STRING());

请审阅并给出您的意见!

qcuzuvrc

qcuzuvrc1#

为什么不使用过滤器,然后根据需要使用不同的writestreams呢。我认为这是一个更好的方法。此外,您将能够更好地适当地处理每个流式查询。谢谢!

相关问题