如何使用 pyspark 读取多个 csv 文件并将它们合并在一起(它们可能不具有相同的列)?

How can I read multiple csv files and merge them together (they may not have the same columns) using pyspark?

我的 hdfs 结构显示一个名为“mapped_files”的文件夹,该文件夹中包含几个 csv 文件 - “mapped_file_1.csv”、“mapped_file_2.csv”、... 如何合并所有这些文件? 这些文件可能没有完全相同的列。例如,当我使用 pyspark 读取文件“mapped_file_1.csv”和“mapped_file_2.csv”时,它们看起来像这样:

###mapped_file_1.csv

+----------+---------+---------+--------+-----+-----+-----------+----------+------------------+---------------------+-------------------+---------------+--------------------+
|chromosome|    start|      end|assembly|  ref|  alt|risk_allele|     genes|         phenotype|clinical_significance|polyphen_prediction|sift_prediction|                hgvs|
+----------+---------+---------+--------+-----+-----+-----------+----------+------------------+---------------------+-------------------+---------------+--------------------+
|         9| 96369762| 96369762|    null|  C/T|  C/T|          T|intergenic|Migraine with aura|                 null|               null|           null|          rs59270819|
|        10| 29075768| 29075768|    null|G/A/C|G/A/C|          A|intergenic|Migraine with aura|                 null|               null|           null|          rs59495588|
+----------+---------+---------+--------+-----+-----+-----------+----------+------------------+---------------------+-------------------+---------------+--------------------+

###mapped_file_2.csv

+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+--------+---+---+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
|variant_name|variant_id|chromosome|     genes|        variant_type|description|polyphen_prediction|sift_prediction|                hgvs|assembly|assembly.date|   start|     end|ref|alt|risk_allele|           phenotype|clinical_actionability|classification|clinical_significance|          method|  assertion_criteria|     level_certainty|      date|              author|  origin|title|year|authors|pmid|is_gwas|   name|                 url|version|databanks.variant_id|clinvar_accession|
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+--------+---+---+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
|        null|      null|         1|['TARDBP']|single nucleotide...|       null|               null|           null|['Q13148:p.Ala90V...|  GRCh38|         null|11016874|11016874|  C|  T|          T|Amyotrophic later...|                  null|          null| Uncertain signifi...| literature only|                null|no assertion crit...|2019-07-02|         GeneReviews|germline| null|null|   null|null|   null|ClinVar|https://www.ncbi....|   null|                null|     VCV000021481|
|        null|      null|         1|['TARDBP']|single nucleotide...|       null|               null|           null|['Q13148:p.Ala90V...|  GRCh38|         null|11016874|11016874|  C|  T|          T|Amyotrophic later...|                  null|          null| Uncertain signifi...|clinical testing|Invitae Variant C...|criteria provided...|2019-08-15|             Invitae|germline| null|null|   null|null|   null|ClinVar|https://www.ncbi....|   null|                null|     VCV000021481|
|        null|      null|         1|['TARDBP']|single nucleotide...|       null|               null|           null|['Q13148:p.Ala90V...|  GRCh38|         null|11016874|11016874|  C|  T|          T|Amyotrophic later...|                  null|          null| Uncertain signifi...| literature only|                null|no assertion crit...|2019-07-02|         GeneReviews|germline| null|null|   null|null|   null|ClinVar|https://www.ncbi....|   null|                null|     VCV000021481|
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+--------+---+---+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+

从前面的dataframes/files可以看出,有两个dataframes/files中都不存在的列。 我这样做了:

from pyspark.sql import SparkSession
from pyspark.sql import Row
from functools import reduce
import pyspark.sql.functions as F

warehouse_location ='hdfs://hdfs-nn:9000'

spark = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("csv") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .getOrCreate()

all_data = spark.read.options(header='True', delimiter=';').csv("hdfs://hdfs-nn:9000/mapped_files/*")

all_data.show()

+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+----------+----+----+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
|variant_name|variant_id|chromosome|     genes|        variant_type|description|polyphen_prediction|sift_prediction|                hgvs|assembly|assembly.date|   start|       end| ref| alt|risk_allele|           phenotype|clinical_actionability|classification|clinical_significance|          method|  assertion_criteria|     level_certainty|      date|              author|  origin|title|year|authors|pmid|is_gwas|   name|                 url|version|databanks.variant_id|clinvar_accession|
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+----------+----+----+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
|        null|      null|         1|['TARDBP']|single nucleotide...|       null|               null|           null|['Q13148:p.Ala90V...|  GRCh38|         null|11016874|  11016874|   C|   T|          T|Amyotrophic later...|                  null|          null| Uncertain signifi...| literature only|                null|no assertion crit...|2019-07-02|         GeneReviews|germline| null|null|   null|null|   null|ClinVar|https://www.ncbi....|   null|                null|     VCV000021481|
|        null|      null|         1|['TARDBP']|single nucleotide...|       null|               null|           null|['Q13148:p.Ala90V...|  GRCh38|         null|11016874|  11016874|   C|   T|          T|Amyotrophic later...|                  null|          null| Uncertain signifi...|clinical testing|Invitae Variant C...|criteria provided...|2019-08-15|             Invitae|germline| null|null|   null|null|   null|ClinVar|https://www.ncbi....|   null|                null|     VCV000021481|
|        null|      null|         1|['TARDBP']|single nucleotide...|       null|               null|           null|['Q13148:p.Ala90V...|  GRCh38|         null|11016874|  11016874|   C|   T|          T|Amyotrophic later...|                  null|          null| Uncertain signifi...| literature only|                null|no assertion crit...|2019-07-02|         GeneReviews|germline| null|null|   null|null|   null|ClinVar|https://www.ncbi....|   null|                null|     VCV000021481|
|           9|  96369762|  96369762|      null|                 C/T|        C/T|                  T|     intergenic|  Migraine with aura|    null|         null|    null|rs59270819|null|null|       null|                null|                  null|          null|                 null|            null|                null|                null|      null|                null|    null| null|null|   null|null|   null|   null|                null|   null|                null|             null|
|          10|  29075768|  29075768|      null|               G/A/C|      G/A/C|                  A|     intergenic|  Migraine with aura|    null|         null|    null|rs59495588|null|null|       null|                null|                  null|          null|                 null|            null|                null|                null|      null|                null|    null| null|null|   null|null|   null|   null|                null|   null|                null|             null|
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+----------+----+----+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
only showing top 20 rows

当我使用前面的代码时,公共列的值没有出现在正确的位置(在最后两行中,值不在正确的列中)。

所以,我的问题是:如何使用 pyspark 读取多个 csv 文件并将它们合并在一起(它们可能不具有相同的列)?

简单的方法是将缺失的列添加到两个数据框中并使用联合函数。我更喜欢 unionByName,所以我将在示例中使用它:

df1 = spark.read.options(header='True', delimiter=';').csv("mapped_file_1.csv")
df2 = spark.read.options(header='True', delimiter=';').csv("mapped_file_2.csv")

united_df = df1.unionByName(df2, allowMissingColumns=True)

allowMissingColumns 将使用 NULL 补全数据框中缺失的列。

如果你有超过 2 个文件,你可以只定义一个函数并使用 reduce 来合并所有数据帧:

def unite_dfs(df1, df2):
  return df1.unionByName(df2, allowMissingColumns=True)


list_of_dfs = [df1, df2, df3, df4, df5, df6]
united_df = reduce(unite_dfs, list_of_dfs)

如果清楚,请告诉我。我没有包含导入,因为我只使用了您代码段中的库。如果不清楚我可以编辑。