如何使用 scala 在 spark 中合并多个 DStream?

How to merge multiple DStreams in spark using scala?

我有三个来自 Kafka 的传入流。我将接收到的流解析为 JSON 并将它们提取到适当的大小写 类 并形成以下模式的 DStreams:

case class Class1(incident_id: String,
                  crt_object_id: String,
                  source: String,
                  order_number: String)

case class Class2(crt_object_id: String,
                  hangup_cause: String)

case class Class3(crt_object_id: String,
                  text: String)

我想根据公共列加入这三个 DStream,即 crt_object_id。所需的 DStream 应采用以下形式:

case class Merged(incident_id: String,
                  crt_object_id: String,
                  source: String,
                  order_number: String,
                  hangup_cause: String,
                  text: String)

请告诉我一个方法来做同样的事情。我对 Spark 和 Scala 都很陌生。

Spark Streaming documentation告诉你join方法的签名:

join(otherStream, [numTasks])

When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.

请注意,您需要 DStream 个键值对而不是大小写 class 个。因此,您必须从案例 classes 中提取要加入的字段,加入流并将生成的流打包到适当的案例 class.

case class Class1(incident_id: String, crt_object_id: String,
                  source: String, order_number: String)
case class Class2(crt_object_id: String, hangup_cause: String)
case class Class3(crt_object_id: String, text: String)
case class Merged(incident_id: String, crt_object_id: String,
                  source: String, order_number: String,
                  hangup_cause: String, text: String)

val stream1: DStream[Class1] = ...
val stream2: DStream[Class2] = ...
val stream3: DStream[Class3] = ...

val transformedStream1: DStream[(String, Class1)] = stream1.map {
    c1 => (c1.crt_object_id, c1)
}
val transformedStream2: DStream[(String, Class2)] = stream2.map {
    c2 => (c2.crt_object_id, c2)
}
val transformedStream3: DStream[(String, Class3)] = stream3.map {
    c3 => (c3.crt_object_id, c3)
}

val joined: DStream[(String, ((Class1, Class2), Class3))] =
    transformedStream1.join(transformedStream2).join(transformedStream3)

val merged: DStream[Merged] = joined.map {
    case (crt_object_id, ((c1, c2), c3)) =>
        Merged(c1.incident_id, crt_object_id, c1.source,
               c1.order_number, c2.hangup_cause, c3.text)

}