如何使用 Spark 以并行方式高效地将数据发送到 REST 端点?
How can I efficiently send data in a parallelized way to a REST endpoint using Spark?
我在 HDFS mydata.txt
中存储了一个巨大的文件,其中每一行都包含必须提交给 REST 端点的数据。我想知道如何有效地 group/partition 数据(文件中的行),然后使用 OkHttp
将它们提交到 REST 端点。我想要 group/partition 数据,因为我不想创建太多的 HTTP 客户端,也不想分配工作量。
例如,我目前有类似以下内容。
val sc = new SparkContext(new SparkConf())
val client = new OkHttpClient
val input = "hdfs://myserver/path/to/mydata.txt"
sc.textFile(input)
.foreach(line => {
val request = new Request.Builder()
.url("http://anotherserver/api/data")
.post(RequestBody.create(MediaType.parse("application/json"), line))
.build()
client.newCall(request).execute()
})
据我了解,foreach
是一个Action
,所以它在驱动程序上调用,因此,client
不必序列化,可以在所有数据(行)。当然,这个解决方案不是并行化的。
我也考虑过分区,但我认为 foreachPartition
也是一个 Action
。
sc.textFile(input)
.map(line => (Random.nextInt(10), line))
.partitionBy(new HashPartitioner(10))
.foreachPartition(iter => {
while(iter.hasNext) {
val item = iter.next()
val line = item._2
//submit to REST endpoint
}
})
关于如何使用 Spark 将数据提交到 REST 端点的工作并行化有什么想法吗?
EDIT 结果发现 OkHttpClient
不可序列化,甚至不能在 foreach
循环中使用。
解决此类问题的典型方法如下:
确保您要使用的 REST 库对所有执行程序可用。这样就无需担心序列化。
根据核心数选择并发级别
重新分区您的数据,使#partititions >= k * #executors。当访问具有可变吞吐量的外部服务时,我使用较大的 k
,例如 5-10,以减少一批 "slow" 输入减慢整个作业的可能性。
map()
数据并在映射函数体内设置客户端,这消除了序列化问题。 Return 一对输入和 success/failure 以及任何诊断信息。
过滤失败并决定如何处理它们,例如,重新处理它们(您甚至可以保留重试计数)。
如果设置 HTTP 客户端的成本很高,请使用 mapPartitions()
而不是 map()
,因为它允许您设置一次客户端并用它处理许多输入。
基础版:
def restCall(url: String): MyResultOrError = ...
val numCoresPerExecutor = ...
val numCores = numCoresPerExecutor * (sc.getExecutorStorageStatus.length - 1)
val result = rdd
.repartition(5 * numCores)
.map(url => (url, restCall(url)))
我在 HDFS mydata.txt
中存储了一个巨大的文件,其中每一行都包含必须提交给 REST 端点的数据。我想知道如何有效地 group/partition 数据(文件中的行),然后使用 OkHttp
将它们提交到 REST 端点。我想要 group/partition 数据,因为我不想创建太多的 HTTP 客户端,也不想分配工作量。
例如,我目前有类似以下内容。
val sc = new SparkContext(new SparkConf())
val client = new OkHttpClient
val input = "hdfs://myserver/path/to/mydata.txt"
sc.textFile(input)
.foreach(line => {
val request = new Request.Builder()
.url("http://anotherserver/api/data")
.post(RequestBody.create(MediaType.parse("application/json"), line))
.build()
client.newCall(request).execute()
})
据我了解,foreach
是一个Action
,所以它在驱动程序上调用,因此,client
不必序列化,可以在所有数据(行)。当然,这个解决方案不是并行化的。
我也考虑过分区,但我认为 foreachPartition
也是一个 Action
。
sc.textFile(input)
.map(line => (Random.nextInt(10), line))
.partitionBy(new HashPartitioner(10))
.foreachPartition(iter => {
while(iter.hasNext) {
val item = iter.next()
val line = item._2
//submit to REST endpoint
}
})
关于如何使用 Spark 将数据提交到 REST 端点的工作并行化有什么想法吗?
EDIT 结果发现 OkHttpClient
不可序列化,甚至不能在 foreach
循环中使用。
解决此类问题的典型方法如下:
确保您要使用的 REST 库对所有执行程序可用。这样就无需担心序列化。
根据核心数选择并发级别
重新分区您的数据,使#partititions >= k * #executors。当访问具有可变吞吐量的外部服务时,我使用较大的
k
,例如 5-10,以减少一批 "slow" 输入减慢整个作业的可能性。map()
数据并在映射函数体内设置客户端,这消除了序列化问题。 Return 一对输入和 success/failure 以及任何诊断信息。过滤失败并决定如何处理它们,例如,重新处理它们(您甚至可以保留重试计数)。
如果设置 HTTP 客户端的成本很高,请使用 mapPartitions()
而不是 map()
,因为它允许您设置一次客户端并用它处理许多输入。
基础版:
def restCall(url: String): MyResultOrError = ...
val numCoresPerExecutor = ...
val numCores = numCoresPerExecutor * (sc.getExecutorStorageStatus.length - 1)
val result = rdd
.repartition(5 * numCores)
.map(url => (url, restCall(url)))