如何添加一列指示磁盘上文件的行号?
How do I add a column indicating the row number from a file on disk?
我想使用 spark.read.csv
解析一系列 .csv
文件,但我想在文件中包含每一行的行号。
我知道除非明确告知,否则 Spark 通常不会对 DataFrame 进行排序,而且我不想编写自己的 .csv
文件解析器,因为这比 Spark 自己的实现要慢得多.如何以分布式安全方式添加此行号?
从阅读有关 zipWithIndex 的文章来看,它似乎很有用,但不幸的是它似乎需要分区顺序稳定
假设我们有以下设置,用于在磁盘上创建一个包含我们控制的内容的 .csv
文件:
from pyspark.sql import types as T, functions as F, SparkSession
import os
import tempfile
spark = SparkSession.builder.getOrCreate()
# Synthesize DataFrames
schema = T.StructType([
T.StructField("col_1", T.StringType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("col_3", T.StringType(), False),
T.StructField("col_4", T.IntegerType(), False),
])
data = [
{"col_1": "key_1", "col_2": 1, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_2", "col_2": 2, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_3", "col_2": 3, "col_3": "CREATE", "col_4": 0},
]
final_data = []
# Scale up a bit
for _ in range(3):
final_data += data
def refresh():
df = spark.createDataFrame(final_data, schema)
with tempfile.TemporaryDirectory() as tmpdirname:
pathname = os.path.join(
tmpdirname + "output.csv"
)
df.write.format("csv").option(
"header",
True
).save(pathname)
return spark.read.option(
"header",
True
).csv(pathname)
在此设置中,我们可以重复创建 .csv
文件并将它们保存到磁盘,然后像第一次解析它们时那样检索它们。
我们解析这些文件的策略将归结为以下几点:
- 根据我们在磁盘上使用文件名找到的内容创建标识符,block_start(以防 Spark 将文件分成多个分区),以及每个分区内的可排序标识符
- 将解析后的内容根据这些标识符进行排序,从而保证顺序
- 使用
zipWithIndex
创建一个 row_number 标识符
这看起来像下面这样:
def create_index(
parsed_df,
row_number_column_name="index",
file_name_column_name="_file_name",
block_start_column_name="_block_start",
row_id_column_name="_row_id",
):
unindexed_df = parsed_df.selectExpr(
*parsed_df.columns,
f"input_file_name() AS {file_name_column_name}",
f"input_file_block_start() AS {block_start_column_name}",
f"monotonically_increasing_id() AS {row_id_column_name}"
).orderBy(
file_name_column_name,
block_start_column_name,
row_id_column_name
)
# Unfortunately, we have to unwrap the results of zipWithIndex, so there's some aliasing required
input_cols = unindexed_df.columns
zipped = unindexed_df.rdd.zipWithIndex().toDF()
aliased_columns = []
for input_col in input_cols:
aliased_columns += [zipped["_1." + input_col].alias(input_col)]
# Alias the original columns, remove the ones we built internally
return zipped.select(
*aliased_columns,
zipped["_2"].alias(row_number_column_name)
).drop(
file_name_column_name,
block_start_column_name,
row_id_column_name
)
example_df = refresh()
example_df = create_index(example_df)
example_df.show()
"""
+-----+-----+------+-----+-----+
|col_1|col_2| col_3|col_4|index|
+-----+-----+------+-----+-----+
|key_1| 1|CREATE| 0| 0|
|key_2| 2|CREATE| 0| 1|
|key_3| 3|CREATE| 0| 2|
|key_1| 1|CREATE| 0| 3|
|key_2| 2|CREATE| 0| 4|
|key_3| 3|CREATE| 0| 5|
|key_1| 1|CREATE| 0| 6|
|key_2| 2|CREATE| 0| 7|
|key_3| 3|CREATE| 0| 8|
+-----+-----+------+-----+-----+
"""
我想使用 spark.read.csv
解析一系列 .csv
文件,但我想在文件中包含每一行的行号。
我知道除非明确告知,否则 Spark 通常不会对 DataFrame 进行排序,而且我不想编写自己的 .csv
文件解析器,因为这比 Spark 自己的实现要慢得多.如何以分布式安全方式添加此行号?
从阅读有关 zipWithIndex 的文章来看,它似乎很有用,但不幸的是它似乎需要分区顺序稳定
假设我们有以下设置,用于在磁盘上创建一个包含我们控制的内容的 .csv
文件:
from pyspark.sql import types as T, functions as F, SparkSession
import os
import tempfile
spark = SparkSession.builder.getOrCreate()
# Synthesize DataFrames
schema = T.StructType([
T.StructField("col_1", T.StringType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("col_3", T.StringType(), False),
T.StructField("col_4", T.IntegerType(), False),
])
data = [
{"col_1": "key_1", "col_2": 1, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_2", "col_2": 2, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_3", "col_2": 3, "col_3": "CREATE", "col_4": 0},
]
final_data = []
# Scale up a bit
for _ in range(3):
final_data += data
def refresh():
df = spark.createDataFrame(final_data, schema)
with tempfile.TemporaryDirectory() as tmpdirname:
pathname = os.path.join(
tmpdirname + "output.csv"
)
df.write.format("csv").option(
"header",
True
).save(pathname)
return spark.read.option(
"header",
True
).csv(pathname)
在此设置中,我们可以重复创建 .csv
文件并将它们保存到磁盘,然后像第一次解析它们时那样检索它们。
我们解析这些文件的策略将归结为以下几点:
- 根据我们在磁盘上使用文件名找到的内容创建标识符,block_start(以防 Spark 将文件分成多个分区),以及每个分区内的可排序标识符
- 将解析后的内容根据这些标识符进行排序,从而保证顺序
- 使用
zipWithIndex
创建一个 row_number 标识符
这看起来像下面这样:
def create_index(
parsed_df,
row_number_column_name="index",
file_name_column_name="_file_name",
block_start_column_name="_block_start",
row_id_column_name="_row_id",
):
unindexed_df = parsed_df.selectExpr(
*parsed_df.columns,
f"input_file_name() AS {file_name_column_name}",
f"input_file_block_start() AS {block_start_column_name}",
f"monotonically_increasing_id() AS {row_id_column_name}"
).orderBy(
file_name_column_name,
block_start_column_name,
row_id_column_name
)
# Unfortunately, we have to unwrap the results of zipWithIndex, so there's some aliasing required
input_cols = unindexed_df.columns
zipped = unindexed_df.rdd.zipWithIndex().toDF()
aliased_columns = []
for input_col in input_cols:
aliased_columns += [zipped["_1." + input_col].alias(input_col)]
# Alias the original columns, remove the ones we built internally
return zipped.select(
*aliased_columns,
zipped["_2"].alias(row_number_column_name)
).drop(
file_name_column_name,
block_start_column_name,
row_id_column_name
)
example_df = refresh()
example_df = create_index(example_df)
example_df.show()
"""
+-----+-----+------+-----+-----+
|col_1|col_2| col_3|col_4|index|
+-----+-----+------+-----+-----+
|key_1| 1|CREATE| 0| 0|
|key_2| 2|CREATE| 0| 1|
|key_3| 3|CREATE| 0| 2|
|key_1| 1|CREATE| 0| 3|
|key_2| 2|CREATE| 0| 4|
|key_3| 3|CREATE| 0| 5|
|key_1| 1|CREATE| 0| 6|
|key_2| 2|CREATE| 0| 7|
|key_3| 3|CREATE| 0| 8|
+-----+-----+------+-----+-----+
"""