从集群中将 integer/string 写入 pyspark 中的文本文件
Writing integer/string to a text file in pyspark from a cluster
我正在使用 EMR 阶跃函数来分析数据。
我想存储分析数据帧的计数,以决定是否可以将其保存为 csv 或 parquet。我更喜欢 CSV,但如果大小太大,我将无法下载它并在我的笔记本电脑上使用它。
我使用 count()
方法将其存储到一个 int 变量 limit
当我尝试使用以下代码时:
coalesce(1).write.format("text").option("header", "false").mode("overwrite").save("output.txt")
上面写着:
int doesnt have any attribute called write
有没有办法将整数或字符串写入文件,以便我可以在我的 s3 存储桶中打开它并在 EMR 步骤 运行 之后进行检查?
更新:
我按照@Shu 的建议尝试了数据框方法,但出现以下错误。
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 13.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 13.0 (TID 19396, ip-10-210-13-34.ec2.internal,
executor 11): org.apache.spark.SparkException: Task failed while
writing rows. at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:170)
这可能是什么根本原因?
您可以 parallelize
int 变量创建一个 rdd
然后使用 .saveAsTextFile
写入 HDFS
df.show()
#+---+
#| _1|
#+---+
#| a|
#| b|
#+---+
limit=df.count()
spark.sparkContext.parallelize([limit]).coalesce(1).saveAsTextFile("<path>")
#content of file
#cat <path>/part-00000
#2
其他方法是从 count variable
创建 dataframe
然后以 csv
格式写入 header false
.
from pyspark.sql.types import *
spark.createDataFrame(spark.sparkContext.parallelize([limit]),IntegerType()).coalesce(1).write.format("csv").option("header", "false").mode("overwrite").save("<path>")
#or in text format
spark.createDataFrame(spark.sparkContext.parallelize([limit]),StringType()).coalesce(1).write.format("text").mode("overwrite").save("<path>")
#cat part-*
#2
我正在使用 EMR 阶跃函数来分析数据。
我想存储分析数据帧的计数,以决定是否可以将其保存为 csv 或 parquet。我更喜欢 CSV,但如果大小太大,我将无法下载它并在我的笔记本电脑上使用它。
我使用 count()
方法将其存储到一个 int 变量 limit
当我尝试使用以下代码时:
coalesce(1).write.format("text").option("header", "false").mode("overwrite").save("output.txt")
上面写着:
int doesnt have any attribute called write
有没有办法将整数或字符串写入文件,以便我可以在我的 s3 存储桶中打开它并在 EMR 步骤 运行 之后进行检查?
更新: 我按照@Shu 的建议尝试了数据框方法,但出现以下错误。
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 19396, ip-10-210-13-34.ec2.internal, executor 11): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:170)
这可能是什么根本原因?
您可以 parallelize
int 变量创建一个 rdd
然后使用 .saveAsTextFile
df.show()
#+---+
#| _1|
#+---+
#| a|
#| b|
#+---+
limit=df.count()
spark.sparkContext.parallelize([limit]).coalesce(1).saveAsTextFile("<path>")
#content of file
#cat <path>/part-00000
#2
其他方法是从 count variable
创建 dataframe
然后以 csv
格式写入 header false
.
from pyspark.sql.types import *
spark.createDataFrame(spark.sparkContext.parallelize([limit]),IntegerType()).coalesce(1).write.format("csv").option("header", "false").mode("overwrite").save("<path>")
#or in text format
spark.createDataFrame(spark.sparkContext.parallelize([limit]),StringType()).coalesce(1).write.format("text").mode("overwrite").save("<path>")
#cat part-*
#2