创建一个数据框,其中包含来自 foreachPartition 中 api 请求的所有响应
create a dataframe with all the responses from the api requests within foreachPartition
我正在尝试执行 api 调用以从 amazon s3 获取对象 (json),并且我正在使用 foreachPartition 并行执行多个调用
df.rdd.foreachPartition(partition => {
//Initialize list buffer
var buffer_accounts1 = new ListBuffer[String]()
//Initialize Connection to amazon s3
val s3 = s3clientConnection()
partition.foreach(fun=>{
//api to get object from s3 bucket
//the first column of each row contains s3 object name
val obj = getS3Object(s3, "my_bucket", fun.getString(0)).getContent
val objString = IOUtils.toString(obj, "UTF-8")
buffer_accounts1 += objString
})
buffer_accounts1.toList.toDF("Object").write.parquet("dbfs:/mnt/test")
})
我想从 foreachPartition 中将来自所有 api 调用的字符串响应存储到一个数据帧中。因此,如果在 forEachPartition 中,如果我总共进行了 100 api 次调用,我想创建一个包含所有 100 个响应的数据框。
为此,我正在创建一个可变列表,并希望将其转换为 foreachPartition 中的数据框,但我们无法在驱动程序外部创建数据框。
我正在尝试创建一个数据框,其中包含来自 foreachPartition 中总 api 调用的所有响应,以便我可以应用进一步的转换。如何实现?
注意:- 我可以将每个响应写为 json 并读回它们,但这会导致性能下降,因为有很多磁盘 I/O 操作。
可以使用mapPartitions实现
val df_response = df.mapPartitions(iterator => {
val api_connect = new s3clientBuild()
val s3client = api_connect.s3connection(AccessKey, SecretKey)
val resp = iterator.map(row =>{
val name = getS3(row.getString(0), s3client)
(name)
})
resp
}).toDF("value")
我正在尝试执行 api 调用以从 amazon s3 获取对象 (json),并且我正在使用 foreachPartition 并行执行多个调用
df.rdd.foreachPartition(partition => {
//Initialize list buffer
var buffer_accounts1 = new ListBuffer[String]()
//Initialize Connection to amazon s3
val s3 = s3clientConnection()
partition.foreach(fun=>{
//api to get object from s3 bucket
//the first column of each row contains s3 object name
val obj = getS3Object(s3, "my_bucket", fun.getString(0)).getContent
val objString = IOUtils.toString(obj, "UTF-8")
buffer_accounts1 += objString
})
buffer_accounts1.toList.toDF("Object").write.parquet("dbfs:/mnt/test")
})
我想从 foreachPartition 中将来自所有 api 调用的字符串响应存储到一个数据帧中。因此,如果在 forEachPartition 中,如果我总共进行了 100 api 次调用,我想创建一个包含所有 100 个响应的数据框。
为此,我正在创建一个可变列表,并希望将其转换为 foreachPartition 中的数据框,但我们无法在驱动程序外部创建数据框。
我正在尝试创建一个数据框,其中包含来自 foreachPartition 中总 api 调用的所有响应,以便我可以应用进一步的转换。如何实现?
注意:- 我可以将每个响应写为 json 并读回它们,但这会导致性能下降,因为有很多磁盘 I/O 操作。
可以使用mapPartitions实现
val df_response = df.mapPartitions(iterator => {
val api_connect = new s3clientBuild()
val s3client = api_connect.s3connection(AccessKey, SecretKey)
val resp = iterator.map(row =>{
val name = getS3(row.getString(0), s3client)
(name)
})
resp
}).toDF("value")