我有一个需要有条件地处理我的数据取决于什么是作为输入接收。该应用程序是一个spark(2.3.4)结构化流媒体应用程序,读取Kafka源代码(2.3.0)。我可以成功地读取数据,获取数据,解析数据等等。
根据消息包含的数据,我需要进行进一步的处理。我有一个工作方法在下面的代码中列出,需要一个关键的评估,看看这是最好的方法还是另一个更好的方法是可用的。
工作方法如下。基于这个消息,我需要做更多的转换,并将各种转换后的输出保存到数据库中,最后以csv或json格式提供答案。
//raw streaming data from kafka here
Dataset<String> values = dsRawData
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
//conditional processing check here
Dataset<String> csvIn = values
.map((MapFunction<String, String>) se -> {
String[] controls = se.split(",");
secoreLog.info(logHeader+"controls: " + controls[0]);
if(controls[0].equals("magic1") && controls[1].equals("magic2") &&
controls[2].equals("magic2") && controls[3].equals("magic1")){
//trigger transformations & writes
}
else {
//trigger a different set of transformations & writes
}
return controls.toString();
}, Encoders.STRING());
请审阅并给出您的意见!
1条答案
按热度按时间qcuzuvrc1#
为什么不使用过滤器,然后根据需要使用不同的writestreams呢。我认为这是一个更好的方法。此外,您将能够更好地适当地处理每个流式查询。谢谢!