如何将数据帧传递给pyspark并行操作?
How to pass dataframe to pyspark parallel operation?
我正在尝试按工资值过滤数据框,然后使用 pyspark 将它们保存为 CSV 文件。
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [('James','M',30),('Anna','F',41),
('Robert','M',62),('yana','M',30)
]
columns = ["firstname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
spark.sparkContext.parallelize([30,41,62]).foreach(lambda x : df.filter(df.salary == x).show())
而 运行 这段代码 returns
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
(已编辑)
更多我需要同时执行文件写入spark过滤数据帧
spark.sparkContext.parallelize([30,41,62]).foreach(lambda x : df.filter(df.salary == x).write.save((f'/path-to-drive-location/{driver}'))
您不想处理 RDD 和 foreach
或 map
。您可以只创建另一个数据框并执行内部联接(或您想要的任何其他联接)。
salaries = [30, 41, 62]
a = spark.createDataFrame([(s,) for s in salaries], ['salary'])
+------+
|salary|
+------+
| 30|
| 41|
| 62|
+------+
df2 = df.join(a, on=['salary'])
df2.show()
+------+---------+------+
|salary|firstname|gender|
+------+---------+------+
| 30| James| M|
| 30| yana| M|
| 41| Anna| F|
| 62| Robert| M|
+------+---------+------+
df2.write.csv('/path/to/location')
我正在尝试按工资值过滤数据框,然后使用 pyspark 将它们保存为 CSV 文件。
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [('James','M',30),('Anna','F',41),
('Robert','M',62),('yana','M',30)
]
columns = ["firstname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
spark.sparkContext.parallelize([30,41,62]).foreach(lambda x : df.filter(df.salary == x).show())
而 运行 这段代码 returns
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
(已编辑)
更多我需要同时执行文件写入spark过滤数据帧
spark.sparkContext.parallelize([30,41,62]).foreach(lambda x : df.filter(df.salary == x).write.save((f'/path-to-drive-location/{driver}'))
您不想处理 RDD 和 foreach
或 map
。您可以只创建另一个数据框并执行内部联接(或您想要的任何其他联接)。
salaries = [30, 41, 62]
a = spark.createDataFrame([(s,) for s in salaries], ['salary'])
+------+
|salary|
+------+
| 30|
| 41|
| 62|
+------+
df2 = df.join(a, on=['salary'])
df2.show()
+------+---------+------+
|salary|firstname|gender|
+------+---------+------+
| 30| James| M|
| 30| yana| M|
| 41| Anna| F|
| 62| Robert| M|
+------+---------+------+
df2.write.csv('/path/to/location')