从集群中将 integer/string 写入 pyspark 中的文本文件

Writing integer/string to a text file in pyspark from a cluster

我正在使用 EMR 阶跃函数来分析数据。 我想存储分析数据帧的计数,以决定是否可以将其保存为 csv 或 parquet。我更喜欢 CSV,但如果大小太大,我将无法下载它并在我的笔记本电脑上使用它。 我使用 count() 方法将其存储到一个 int 变量 limit 当我尝试使用以下代码时:

coalesce(1).write.format("text").option("header", "false").mode("overwrite").save("output.txt")

上面写着:

int doesnt have any attribute called write

有没有办法将整数或字符串写入文件,以便我可以在我的 s3 存储桶中打开它并在 EMR 步骤 运行 之后进行检查?

更新: 我按照@Shu 的建议尝试了数据框方法,但出现以下错误。

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 19396, ip-10-210-13-34.ec2.internal, executor 11): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:170)

这可能是什么根本原因?

您可以 parallelize int 变量创建一个 rdd 然后使用 .saveAsTextFile

写入 HDFS
df.show()
#+---+
#| _1|
#+---+
#|  a|
#|  b|
#+---+
limit=df.count()
spark.sparkContext.parallelize([limit]).coalesce(1).saveAsTextFile("<path>")

#content of file
#cat <path>/part-00000
#2 

其他方法是从 count variable 创建 dataframe 然后以 csv 格式写入 header false.

from pyspark.sql.types import *
spark.createDataFrame(spark.sparkContext.parallelize([limit]),IntegerType()).coalesce(1).write.format("csv").option("header", "false").mode("overwrite").save("<path>")

#or in text format
spark.createDataFrame(spark.sparkContext.parallelize([limit]),StringType()).coalesce(1).write.format("text").mode("overwrite").save("<path>")

#cat part-*
#2