如何使用scala合并spark中的多个数据流?

bogh5gae  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(650)

我有三条来自Kafka的信息。我将接收到的流解析为json,并将它们提取到适当的case类中,形成以下模式的数据流:

case class Class1(incident_id: String,
                  crt_object_id: String,
                  source: String,
                  order_number: String)

case class Class2(crt_object_id: String,
                  hangup_cause: String)

case class Class3(crt_object_id: String,
                  text: String)

我想基于公共列连接这三个数据流。 crt_object_id . 所需的数据流应为以下形式:

case class Merged(incident_id: String,
                  crt_object_id: String,
                  source: String,
                  order_number: String,
                  hangup_cause: String,
                  text: String)

请告诉我一个同样的方法。我对spark和scala都很陌生。

2hh7jdfx

2hh7jdfx1#

spark流文档告诉您 join 方法: join(otherStream, [numTasks]) 当两个人被召唤的时候 DStream 第页,共页 (K, V) 以及 (K, W) 两人一组,返回一个新的 DStream(K, (V, W)) 与每个键的所有元素对配对。
请注意,您需要 DStream 键值对而不是case类。因此,您必须从case类中提取要加入的字段,加入流并将结果流打包到适当的case类中。

case class Class1(incident_id: String, crt_object_id: String,
                  source: String, order_number: String)
case class Class2(crt_object_id: String, hangup_cause: String)
case class Class3(crt_object_id: String, text: String)
case class Merged(incident_id: String, crt_object_id: String,
                  source: String, order_number: String,
                  hangup_cause: String, text: String)

val stream1: DStream[Class1] = ...
val stream2: DStream[Class2] = ...
val stream3: DStream[Class3] = ...

val transformedStream1: DStream[(String, Class1)] = stream1.map {
    c1 => (c1.crt_object_id, c1)
}
val transformedStream2: DStream[(String, Class2)] = stream2.map {
    c2 => (c2.crt_object_id, c2)
}
val transformedStream3: DStream[(String, Class3)] = stream3.map {
    c3 => (c3.crt_object_id, c3)
}

val joined: DStream[(String, ((Class1, Class2), Class3))] =
    transformedStream1.join(transformedStream2).join(transformedStream3)

val merged: DStream[Merged] = joined.map {
    case (crt_object_id, ((c1, c2), c3)) =>
        Merged(c1.incident_id, crt_object_id, c1.source,
               c1.order_number, c2.hangup_cause, c3.text)

}

相关问题