过滤后如何将数据帧写入S3

How to write the dataframe to S3 after filter

我正在尝试使用以下 Scala 代码在脚本编辑中以 CVS 格式将过滤后的数据帧写入 S3。

当前状态:

环境条件:我有所有S3的管理员权限。

import com.amazonaws.services.glue.GlueContext
import <others>

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
    val datasource0 = glueContext.getCatalogSource(database = "db", tableName = "table", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
    val appymapping1 = datasource0.appyMapping(mapping=........)

    val temp=appymapping1.toDF.filter(some filtering rules)
    print("start")
    if (temp.count() <= 0) {
    temp.write.format("csv").option("sep", ",").save("s3://directory/error.csv")
  }
    print("End")
     

你正在使用 if 条件将 Dataframe 写入 S3(If 条件是检查 dataframe 是否有一行或多行),但你的 If 条件是反转的。仅当数据框有 0(或更少)行时才为真。所以改变那个。

高级:Spark 始终将文件保存为“part-”名称。所以将 S3 路径更改为 s3://directory/。并添加 .mode("overwrite") .

所以你写的 df 查询应该是

temp.write.format("csv").option("sep", ",").mode("overwrite").save("s3://目录")