在 Spark 流作业中调用实用程序(外部)
Invoking a utility(external) inside Spark streaming job
我有一个从 Kafka 消费的流媒体作业(使用 createDstream
)。
它的 "id"
流
[id1,id2,id3 ..]
我有一个实用程序或一个 api,它接受一个 ID 数组并执行一些外部调用并接收回一些信息,比如每个 ID "t"
[id:t1,id2:t2,id3:t3...]
我想在调用保留 Dstream 的实用程序时保留 DStream
。我不能在 Dstream rdd 上使用映射转换,因为它会调用每个 id,而且该实用程序正在接受 id 的集合。
Dstream.map(x=> myutility(x)) -- ruled out
如果我使用
Dstream.foreachrdd(rdd=> myutility(rdd.collect.toarray))
我输了DStream
。我需要保留 DStream
用于下游处理。
实现外部批量调用的方法是直接在分区级别转换DStream中的RDD。
模式看起来像这样:
val transformedStream = dstream.transform{rdd =>
rdd.mapPartitions{iterator =>
val externalService = Service.instance() // point to reserve local resources or make server connections.
val data = iterator.toList // to act in bulk. Need to tune partitioning to avoid huge data loads at this level
val resultCollection = externalService(data)
resultCollection.iterator
}
}
这种方法使用集群中的可用资源并行处理底层 RDD 的每个分区。请注意,需要为每个分区(而不是每个元素)实例化与外部系统的连接。
我有一个从 Kafka 消费的流媒体作业(使用 createDstream
)。
它的 "id"
[id1,id2,id3 ..]
我有一个实用程序或一个 api,它接受一个 ID 数组并执行一些外部调用并接收回一些信息,比如每个 ID "t"
[id:t1,id2:t2,id3:t3...]
我想在调用保留 Dstream 的实用程序时保留 DStream
。我不能在 Dstream rdd 上使用映射转换,因为它会调用每个 id,而且该实用程序正在接受 id 的集合。
Dstream.map(x=> myutility(x)) -- ruled out
如果我使用
Dstream.foreachrdd(rdd=> myutility(rdd.collect.toarray))
我输了DStream
。我需要保留 DStream
用于下游处理。
实现外部批量调用的方法是直接在分区级别转换DStream中的RDD。
模式看起来像这样:
val transformedStream = dstream.transform{rdd =>
rdd.mapPartitions{iterator =>
val externalService = Service.instance() // point to reserve local resources or make server connections.
val data = iterator.toList // to act in bulk. Need to tune partitioning to avoid huge data loads at this level
val resultCollection = externalService(data)
resultCollection.iterator
}
}
这种方法使用集群中的可用资源并行处理底层 RDD 的每个分区。请注意,需要为每个分区(而不是每个元素)实例化与外部系统的连接。