过滤后如何将数据帧写入S3
How to write the dataframe to S3 after filter
我正在尝试使用以下 Scala 代码在脚本编辑中以 CVS 格式将过滤后的数据帧写入 S3。
当前状态:
在 运行 之后没有显示任何错误,只是没有写入 S3。
日志屏幕打印开始,但看不到打印结束。
没有指明问题的特定错误消息。
停在 temp.count。
环境条件:我有所有S3的管理员权限。
import com.amazonaws.services.glue.GlueContext
import <others>
object GlueApp {
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
// @params: [JOB_NAME]
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val datasource0 = glueContext.getCatalogSource(database = "db", tableName = "table", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
val appymapping1 = datasource0.appyMapping(mapping=........)
val temp=appymapping1.toDF.filter(some filtering rules)
print("start")
if (temp.count() <= 0) {
temp.write.format("csv").option("sep", ",").save("s3://directory/error.csv")
}
print("End")
你正在使用 if 条件将 Dataframe 写入 S3(If 条件是检查 dataframe 是否有一行或多行),但你的 If 条件是反转的。仅当数据框有 0(或更少)行时才为真。所以改变那个。
高级:Spark 始终将文件保存为“part-”名称。所以将 S3 路径更改为 s3://directory/。并添加 .mode("overwrite") .
所以你写的 df 查询应该是
temp.write.format("csv").option("sep", ",").mode("overwrite").save("s3://目录")
我正在尝试使用以下 Scala 代码在脚本编辑中以 CVS 格式将过滤后的数据帧写入 S3。
当前状态:
在 运行 之后没有显示任何错误,只是没有写入 S3。
日志屏幕打印开始,但看不到打印结束。
没有指明问题的特定错误消息。
停在 temp.count。
环境条件:我有所有S3的管理员权限。
import com.amazonaws.services.glue.GlueContext
import <others>
object GlueApp {
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
// @params: [JOB_NAME]
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val datasource0 = glueContext.getCatalogSource(database = "db", tableName = "table", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
val appymapping1 = datasource0.appyMapping(mapping=........)
val temp=appymapping1.toDF.filter(some filtering rules)
print("start")
if (temp.count() <= 0) {
temp.write.format("csv").option("sep", ",").save("s3://directory/error.csv")
}
print("End")
你正在使用 if 条件将 Dataframe 写入 S3(If 条件是检查 dataframe 是否有一行或多行),但你的 If 条件是反转的。仅当数据框有 0(或更少)行时才为真。所以改变那个。
高级:Spark 始终将文件保存为“part-”名称。所以将 S3 路径更改为 s3://directory/。并添加 .mode("overwrite") .
所以你写的 df 查询应该是
temp.write.format("csv").option("sep", ",").mode("overwrite").save("s3://目录")