如何使用 scala 在 spark 中合并多个 DStream?
How to merge multiple DStreams in spark using scala?
我有三个来自 Kafka 的传入流。我将接收到的流解析为 JSON 并将它们提取到适当的大小写 类 并形成以下模式的 DStreams:
case class Class1(incident_id: String,
crt_object_id: String,
source: String,
order_number: String)
case class Class2(crt_object_id: String,
hangup_cause: String)
case class Class3(crt_object_id: String,
text: String)
我想根据公共列加入这三个 DStream,即 crt_object_id
。所需的 DStream 应采用以下形式:
case class Merged(incident_id: String,
crt_object_id: String,
source: String,
order_number: String,
hangup_cause: String,
text: String)
请告诉我一个方法来做同样的事情。我对 Spark 和 Scala 都很陌生。
Spark Streaming documentation告诉你join
方法的签名:
join(otherStream, [numTasks])
When called on two DStream
s of (K, V)
and (K, W)
pairs, return a new DStream
of (K, (V, W))
pairs with all pairs of elements for each key.
请注意,您需要 DStream
个键值对而不是大小写 class 个。因此,您必须从案例 classes 中提取要加入的字段,加入流并将生成的流打包到适当的案例 class.
case class Class1(incident_id: String, crt_object_id: String,
source: String, order_number: String)
case class Class2(crt_object_id: String, hangup_cause: String)
case class Class3(crt_object_id: String, text: String)
case class Merged(incident_id: String, crt_object_id: String,
source: String, order_number: String,
hangup_cause: String, text: String)
val stream1: DStream[Class1] = ...
val stream2: DStream[Class2] = ...
val stream3: DStream[Class3] = ...
val transformedStream1: DStream[(String, Class1)] = stream1.map {
c1 => (c1.crt_object_id, c1)
}
val transformedStream2: DStream[(String, Class2)] = stream2.map {
c2 => (c2.crt_object_id, c2)
}
val transformedStream3: DStream[(String, Class3)] = stream3.map {
c3 => (c3.crt_object_id, c3)
}
val joined: DStream[(String, ((Class1, Class2), Class3))] =
transformedStream1.join(transformedStream2).join(transformedStream3)
val merged: DStream[Merged] = joined.map {
case (crt_object_id, ((c1, c2), c3)) =>
Merged(c1.incident_id, crt_object_id, c1.source,
c1.order_number, c2.hangup_cause, c3.text)
}
我有三个来自 Kafka 的传入流。我将接收到的流解析为 JSON 并将它们提取到适当的大小写 类 并形成以下模式的 DStreams:
case class Class1(incident_id: String,
crt_object_id: String,
source: String,
order_number: String)
case class Class2(crt_object_id: String,
hangup_cause: String)
case class Class3(crt_object_id: String,
text: String)
我想根据公共列加入这三个 DStream,即 crt_object_id
。所需的 DStream 应采用以下形式:
case class Merged(incident_id: String,
crt_object_id: String,
source: String,
order_number: String,
hangup_cause: String,
text: String)
请告诉我一个方法来做同样的事情。我对 Spark 和 Scala 都很陌生。
Spark Streaming documentation告诉你join
方法的签名:
join(otherStream, [numTasks])
When called on two
DStream
s of(K, V)
and(K, W)
pairs, return a newDStream
of(K, (V, W))
pairs with all pairs of elements for each key.
请注意,您需要 DStream
个键值对而不是大小写 class 个。因此,您必须从案例 classes 中提取要加入的字段,加入流并将生成的流打包到适当的案例 class.
case class Class1(incident_id: String, crt_object_id: String,
source: String, order_number: String)
case class Class2(crt_object_id: String, hangup_cause: String)
case class Class3(crt_object_id: String, text: String)
case class Merged(incident_id: String, crt_object_id: String,
source: String, order_number: String,
hangup_cause: String, text: String)
val stream1: DStream[Class1] = ...
val stream2: DStream[Class2] = ...
val stream3: DStream[Class3] = ...
val transformedStream1: DStream[(String, Class1)] = stream1.map {
c1 => (c1.crt_object_id, c1)
}
val transformedStream2: DStream[(String, Class2)] = stream2.map {
c2 => (c2.crt_object_id, c2)
}
val transformedStream3: DStream[(String, Class3)] = stream3.map {
c3 => (c3.crt_object_id, c3)
}
val joined: DStream[(String, ((Class1, Class2), Class3))] =
transformedStream1.join(transformedStream2).join(transformedStream3)
val merged: DStream[Merged] = joined.map {
case (crt_object_id, ((c1, c2), c3)) =>
Merged(c1.incident_id, crt_object_id, c1.source,
c1.order_number, c2.hangup_cause, c3.text)
}