创建一个数据框,其中包含来自 foreachPartition 中 api 请求的所有响应

create a dataframe with all the responses from the api requests within foreachPartition

我正在尝试执行 api 调用以从 amazon s3 获取对象 (json),并且我正在使用 foreachPartition 并行执行多个调用

df.rdd.foreachPartition(partition => {
  //Initialize list buffer
  var buffer_accounts1 = new ListBuffer[String]()
 
  //Initialize Connection to amazon s3
  val s3 = s3clientConnection()
 
  partition.foreach(fun=>{
   //api to get object from s3 bucket
   //the first column of each row contains s3 object name
    val obj = getS3Object(s3, "my_bucket", fun.getString(0)).getContent
    val objString = IOUtils.toString(obj, "UTF-8")
    buffer_accounts1 += objString 
  })
  buffer_accounts1.toList.toDF("Object").write.parquet("dbfs:/mnt/test")
 })

我想从 foreachPartition 中将来自所有 api 调用的字符串响应存储到一个数据帧中。因此,如果在 forEachPartition 中,如果我总共进行了 100 api 次调用,我想创建一个包含所有 100 个响应的数据框。

为此,我正在创建一个可变列表,并希望将其转换为 foreachPartition 中的数据框,但我们无法在驱动程序外部创建数据框。

我正在尝试创建一个数据框,其中包含来自 foreachPartition 中总 api 调用的所有响应,以便我可以应用进一步的转换。如何实现?

注意:- 我可以将每个响应写为 json 并读回它们,但这会导致性能下降,因为有很多磁盘 I/O 操作。

可以使用mapPartitions实现

val df_response = df.mapPartitions(iterator => {
  val api_connect  = new s3clientBuild()
  val s3client = api_connect.s3connection(AccessKey, SecretKey)
  val resp = iterator.map(row =>{
    val name = getS3(row.getString(0), s3client)
    (name)
  })
   resp
  }).toDF("value")