我有三条来自Kafka的信息。我将接收到的流解析为json,并将它们提取到适当的case类中,形成以下模式的数据流:
case class Class1(incident_id: String,
crt_object_id: String,
source: String,
order_number: String)
case class Class2(crt_object_id: String,
hangup_cause: String)
case class Class3(crt_object_id: String,
text: String)
我想基于公共列连接这三个数据流。 crt_object_id
. 所需的数据流应为以下形式:
case class Merged(incident_id: String,
crt_object_id: String,
source: String,
order_number: String,
hangup_cause: String,
text: String)
请告诉我一个同样的方法。我对spark和scala都很陌生。
1条答案
按热度按时间2hh7jdfx1#
spark流文档告诉您
join
方法:join(otherStream, [numTasks])
当两个人被召唤的时候DStream
第页,共页(K, V)
以及(K, W)
两人一组,返回一个新的DStream
的(K, (V, W))
与每个键的所有元素对配对。请注意,您需要
DStream
键值对而不是case类。因此,您必须从case类中提取要加入的字段,加入流并将结果流打包到适当的case类中。