如何保证在 Spark Dataframe 中重新分区
How to guarantee repartitioning in Spark Dataframe
我是 Apache Spark 的新手,我正在尝试按 U.S 对数据帧进行重新分区。状态。然后我想将每个分区分成自己的 RDD 并保存到特定位置:
schema = types.StructType([
types.StructField("details", types.StructType([
types.StructField("state", types.StringType(), True)
]), True)
])
raw_rdd = spark_context.parallelize([
'{"details": {"state": "AL"}}',
'{"details": {"state": "AK"}}',
'{"details": {"state": "AZ"}}',
'{"details": {"state": "AR"}}',
'{"details": {"state": "CA"}}',
'{"details": {"state": "CO"}}',
'{"details": {"state": "CT"}}',
'{"details": {"state": "DE"}}',
'{"details": {"state": "FL"}}',
'{"details": {"state": "GA"}}'
]).map(
lambda row: json.loads(row)
)
rdd = sql_context.createDataFrame(raw_rdd).repartition(10, "details.state").rdd
for index in range(0, rdd.getNumPartitions()):
partition = rdd.mapPartitionsWithIndex(
lambda partition_index, partition: partition if partition_index == index else []
).coalesce(1)
if partition.count() > 0:
df = sql_context.createDataFrame(partition, schema=schema)
for event in df.collect():
print "Partition {0}: {1}".format(index, str(event))
else:
print "Partition {0}: No rows".format(index)
为了测试,我从 S3 加载了一个包含 50 行(示例中为 10 行)的文件,每行在 details.state
列中具有不同的状态。为了模仿我在上面的示例中并行化数据的行为,但行为是相同的。我得到了我要求的 50 个分区,但有些分区没有被使用,有些分区包含多个状态的条目。这是样本集 10 的输出:
Partition 0: Row(details=Row(state=u'AK'))
Partition 1: Row(details=Row(state=u'AL'))
Partition 1: Row(details=Row(state=u'CT'))
Partition 2: Row(details=Row(state=u'CA'))
Partition 3: No rows
Partition 4: No rows
Partition 5: Row(details=Row(state=u'AZ'))
Partition 6: Row(details=Row(state=u'CO'))
Partition 6: Row(details=Row(state=u'FL'))
Partition 6: Row(details=Row(state=u'GA'))
Partition 7: Row(details=Row(state=u'AR'))
Partition 7: Row(details=Row(state=u'DE'))
Partition 8: No rows
Partition 9: No rows
我的问题:重新分区策略只是对 Spark 的建议,还是我的代码存在根本性错误?
这里没有发生意外。 Spark 使用分区键(正)模数分区的散列在分区之间分配行,如果有 50 个分区,您将获得大量重复项:
from pyspark.sql.functions import expr
states = sc.parallelize([
"AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DC", "DE", "FL", "GA",
"HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD",
"MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
"NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC",
"SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"
])
states_df = states.map(lambda x: (x, )).toDF(["state"])
states_df.select(expr("pmod(hash(state), 50)")).distinct().count()
# 26
如果要在写入时分隔文件,最好对 DataFrameWriter
使用 partitionBy
子句。它将为每个级别创建单独的输出并且不需要洗牌。
如果你真的想完全重新分区,你可以使用 RDD API,它允许你使用自定义分区程序。
我是 Apache Spark 的新手,我正在尝试按 U.S 对数据帧进行重新分区。状态。然后我想将每个分区分成自己的 RDD 并保存到特定位置:
schema = types.StructType([
types.StructField("details", types.StructType([
types.StructField("state", types.StringType(), True)
]), True)
])
raw_rdd = spark_context.parallelize([
'{"details": {"state": "AL"}}',
'{"details": {"state": "AK"}}',
'{"details": {"state": "AZ"}}',
'{"details": {"state": "AR"}}',
'{"details": {"state": "CA"}}',
'{"details": {"state": "CO"}}',
'{"details": {"state": "CT"}}',
'{"details": {"state": "DE"}}',
'{"details": {"state": "FL"}}',
'{"details": {"state": "GA"}}'
]).map(
lambda row: json.loads(row)
)
rdd = sql_context.createDataFrame(raw_rdd).repartition(10, "details.state").rdd
for index in range(0, rdd.getNumPartitions()):
partition = rdd.mapPartitionsWithIndex(
lambda partition_index, partition: partition if partition_index == index else []
).coalesce(1)
if partition.count() > 0:
df = sql_context.createDataFrame(partition, schema=schema)
for event in df.collect():
print "Partition {0}: {1}".format(index, str(event))
else:
print "Partition {0}: No rows".format(index)
为了测试,我从 S3 加载了一个包含 50 行(示例中为 10 行)的文件,每行在 details.state
列中具有不同的状态。为了模仿我在上面的示例中并行化数据的行为,但行为是相同的。我得到了我要求的 50 个分区,但有些分区没有被使用,有些分区包含多个状态的条目。这是样本集 10 的输出:
Partition 0: Row(details=Row(state=u'AK'))
Partition 1: Row(details=Row(state=u'AL'))
Partition 1: Row(details=Row(state=u'CT'))
Partition 2: Row(details=Row(state=u'CA'))
Partition 3: No rows
Partition 4: No rows
Partition 5: Row(details=Row(state=u'AZ'))
Partition 6: Row(details=Row(state=u'CO'))
Partition 6: Row(details=Row(state=u'FL'))
Partition 6: Row(details=Row(state=u'GA'))
Partition 7: Row(details=Row(state=u'AR'))
Partition 7: Row(details=Row(state=u'DE'))
Partition 8: No rows
Partition 9: No rows
我的问题:重新分区策略只是对 Spark 的建议,还是我的代码存在根本性错误?
这里没有发生意外。 Spark 使用分区键(正)模数分区的散列在分区之间分配行,如果有 50 个分区,您将获得大量重复项:
from pyspark.sql.functions import expr
states = sc.parallelize([
"AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DC", "DE", "FL", "GA",
"HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD",
"MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
"NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC",
"SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"
])
states_df = states.map(lambda x: (x, )).toDF(["state"])
states_df.select(expr("pmod(hash(state), 50)")).distinct().count()
# 26
如果要在写入时分隔文件,最好对 DataFrameWriter
使用 partitionBy
子句。它将为每个级别创建单独的输出并且不需要洗牌。
如果你真的想完全重新分区,你可以使用 RDD API,它允许你使用自定义分区程序。