如何添加一列指示磁盘上文件的行号?

How do I add a column indicating the row number from a file on disk?

我想使用 spark.read.csv 解析一系列 .csv 文件,但我想在文件中包含每一行的行号。

我知道除非明确告知,否则 Spark 通常不会对 DataFrame 进行排序,而且我不想编写自己的 .csv 文件解析器,因为这比 Spark 自己的实现要慢得多.如何以分布式安全方式添加此行号?

从阅读有关 zipWithIndex 的文章来看,它似乎很有用,但不幸的是它似乎需要分区顺序稳定

假设我们有以下设置,用于在磁盘上创建一个包含我们控制的内容的 .csv 文件:

from pyspark.sql import types as T, functions as F, SparkSession
import os
import tempfile

spark = SparkSession.builder.getOrCreate()


# Synthesize DataFrames
schema = T.StructType([
  T.StructField("col_1", T.StringType(), False),
  T.StructField("col_2", T.IntegerType(), False),
  T.StructField("col_3", T.StringType(), False),
  T.StructField("col_4", T.IntegerType(), False),
])
data = [
  {"col_1": "key_1", "col_2": 1, "col_3": "CREATE", "col_4": 0},
  {"col_1": "key_2", "col_2": 2, "col_3": "CREATE", "col_4": 0},
  {"col_1": "key_3", "col_2": 3, "col_3": "CREATE", "col_4": 0},
]

final_data = []
# Scale up a bit
for _ in range(3):
    final_data += data

def refresh():
    df = spark.createDataFrame(final_data, schema)
    with tempfile.TemporaryDirectory() as tmpdirname:
        pathname = os.path.join(
            tmpdirname + "output.csv"
        )
        df.write.format("csv").option(
            "header",
            True
        ).save(pathname)

        return spark.read.option(
            "header",
            True
        ).csv(pathname)

在此设置中,我们可以重复创建 .csv 文件并将它们保存到磁盘,然后像第一次解析它们时那样检索它们。

我们解析这些文件的策略将归结为以下几点:

  1. 根据我们在磁盘上使用文件名找到的内容创建标识符,block_start(以防 Spark 将文件分成多个分区),以及每个分区内的可排序标识符
  2. 将解析后的内容根据这些标识符进行排序,从而保证顺序
  3. 使用 zipWithIndex
  4. 创建一个 row_number 标识符

这看起来像下面这样:

def create_index(
        parsed_df,
        row_number_column_name="index",
        file_name_column_name="_file_name",
        block_start_column_name="_block_start",
        row_id_column_name="_row_id",
    ):
    unindexed_df = parsed_df.selectExpr(
        *parsed_df.columns,
        f"input_file_name() AS {file_name_column_name}",
        f"input_file_block_start() AS {block_start_column_name}",
        f"monotonically_increasing_id() AS {row_id_column_name}"
    ).orderBy(
        file_name_column_name,
        block_start_column_name,
        row_id_column_name
    )

    # Unfortunately, we have to unwrap the results of zipWithIndex, so there's some aliasing required
    input_cols = unindexed_df.columns
    zipped = unindexed_df.rdd.zipWithIndex().toDF()
    aliased_columns = []
    for input_col in input_cols:
        aliased_columns += [zipped["_1." + input_col].alias(input_col)]

    # Alias the original columns, remove the ones we built internally
    return zipped.select(
        *aliased_columns,
        zipped["_2"].alias(row_number_column_name)
    ).drop(
        file_name_column_name,
        block_start_column_name,
        row_id_column_name
    )

example_df = refresh()
example_df = create_index(example_df)
example_df.show()

"""
+-----+-----+------+-----+-----+
|col_1|col_2| col_3|col_4|index|
+-----+-----+------+-----+-----+
|key_1|    1|CREATE|    0|    0|
|key_2|    2|CREATE|    0|    1|
|key_3|    3|CREATE|    0|    2|
|key_1|    1|CREATE|    0|    3|
|key_2|    2|CREATE|    0|    4|
|key_3|    3|CREATE|    0|    5|
|key_1|    1|CREATE|    0|    6|
|key_2|    2|CREATE|    0|    7|
|key_3|    3|CREATE|    0|    8|
+-----+-----+------+-----+-----+
"""