如何使用 pyspark 读取多个 csv 文件并将它们合并在一起(它们可能不具有相同的列)?
How can I read multiple csv files and merge them together (they may not have the same columns) using pyspark?
我的 hdfs 结构显示一个名为“mapped_files”的文件夹,该文件夹中包含几个 csv 文件 - “mapped_file_1.csv”、“mapped_file_2.csv”、... 如何合并所有这些文件?
这些文件可能没有完全相同的列。例如,当我使用 pyspark 读取文件“mapped_file_1.csv”和“mapped_file_2.csv”时,它们看起来像这样:
###mapped_file_1.csv
+----------+---------+---------+--------+-----+-----+-----------+----------+------------------+---------------------+-------------------+---------------+--------------------+
|chromosome| start| end|assembly| ref| alt|risk_allele| genes| phenotype|clinical_significance|polyphen_prediction|sift_prediction| hgvs|
+----------+---------+---------+--------+-----+-----+-----------+----------+------------------+---------------------+-------------------+---------------+--------------------+
| 9| 96369762| 96369762| null| C/T| C/T| T|intergenic|Migraine with aura| null| null| null| rs59270819|
| 10| 29075768| 29075768| null|G/A/C|G/A/C| A|intergenic|Migraine with aura| null| null| null| rs59495588|
+----------+---------+---------+--------+-----+-----+-----------+----------+------------------+---------------------+-------------------+---------------+--------------------+
###mapped_file_2.csv
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+--------+---+---+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
|variant_name|variant_id|chromosome| genes| variant_type|description|polyphen_prediction|sift_prediction| hgvs|assembly|assembly.date| start| end|ref|alt|risk_allele| phenotype|clinical_actionability|classification|clinical_significance| method| assertion_criteria| level_certainty| date| author| origin|title|year|authors|pmid|is_gwas| name| url|version|databanks.variant_id|clinvar_accession|
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+--------+---+---+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874|11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...| literature only| null|no assertion crit...|2019-07-02| GeneReviews|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874|11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...|clinical testing|Invitae Variant C...|criteria provided...|2019-08-15| Invitae|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874|11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...| literature only| null|no assertion crit...|2019-07-02| GeneReviews|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+--------+---+---+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
从前面的dataframes/files可以看出,有两个dataframes/files中都不存在的列。
我这样做了:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from functools import reduce
import pyspark.sql.functions as F
warehouse_location ='hdfs://hdfs-nn:9000'
spark = SparkSession \
.builder \
.master("local[2]") \
.appName("csv") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.getOrCreate()
all_data = spark.read.options(header='True', delimiter=';').csv("hdfs://hdfs-nn:9000/mapped_files/*")
all_data.show()
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+----------+----+----+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
|variant_name|variant_id|chromosome| genes| variant_type|description|polyphen_prediction|sift_prediction| hgvs|assembly|assembly.date| start| end| ref| alt|risk_allele| phenotype|clinical_actionability|classification|clinical_significance| method| assertion_criteria| level_certainty| date| author| origin|title|year|authors|pmid|is_gwas| name| url|version|databanks.variant_id|clinvar_accession|
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+----------+----+----+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874| 11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...| literature only| null|no assertion crit...|2019-07-02| GeneReviews|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874| 11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...|clinical testing|Invitae Variant C...|criteria provided...|2019-08-15| Invitae|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874| 11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...| literature only| null|no assertion crit...|2019-07-02| GeneReviews|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| 9| 96369762| 96369762| null| C/T| C/T| T| intergenic| Migraine with aura| null| null| null|rs59270819|null|null| null| null| null| null| null| null| null| null| null| null| null| null|null| null|null| null| null| null| null| null| null|
| 10| 29075768| 29075768| null| G/A/C| G/A/C| A| intergenic| Migraine with aura| null| null| null|rs59495588|null|null| null| null| null| null| null| null| null| null| null| null| null| null|null| null|null| null| null| null| null| null| null|
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+----------+----+----+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
only showing top 20 rows
当我使用前面的代码时,公共列的值没有出现在正确的位置(在最后两行中,值不在正确的列中)。
所以,我的问题是:如何使用 pyspark 读取多个 csv 文件并将它们合并在一起(它们可能不具有相同的列)?
简单的方法是将缺失的列添加到两个数据框中并使用联合函数。我更喜欢 unionByName
,所以我将在示例中使用它:
df1 = spark.read.options(header='True', delimiter=';').csv("mapped_file_1.csv")
df2 = spark.read.options(header='True', delimiter=';').csv("mapped_file_2.csv")
united_df = df1.unionByName(df2, allowMissingColumns=True)
allowMissingColumns 将使用 NULL 补全数据框中缺失的列。
如果你有超过 2 个文件,你可以只定义一个函数并使用 reduce 来合并所有数据帧:
def unite_dfs(df1, df2):
return df1.unionByName(df2, allowMissingColumns=True)
list_of_dfs = [df1, df2, df3, df4, df5, df6]
united_df = reduce(unite_dfs, list_of_dfs)
如果清楚,请告诉我。我没有包含导入,因为我只使用了您代码段中的库。如果不清楚我可以编辑。
我的 hdfs 结构显示一个名为“mapped_files”的文件夹,该文件夹中包含几个 csv 文件 - “mapped_file_1.csv”、“mapped_file_2.csv”、... 如何合并所有这些文件? 这些文件可能没有完全相同的列。例如,当我使用 pyspark 读取文件“mapped_file_1.csv”和“mapped_file_2.csv”时,它们看起来像这样:
###mapped_file_1.csv
+----------+---------+---------+--------+-----+-----+-----------+----------+------------------+---------------------+-------------------+---------------+--------------------+
|chromosome| start| end|assembly| ref| alt|risk_allele| genes| phenotype|clinical_significance|polyphen_prediction|sift_prediction| hgvs|
+----------+---------+---------+--------+-----+-----+-----------+----------+------------------+---------------------+-------------------+---------------+--------------------+
| 9| 96369762| 96369762| null| C/T| C/T| T|intergenic|Migraine with aura| null| null| null| rs59270819|
| 10| 29075768| 29075768| null|G/A/C|G/A/C| A|intergenic|Migraine with aura| null| null| null| rs59495588|
+----------+---------+---------+--------+-----+-----+-----------+----------+------------------+---------------------+-------------------+---------------+--------------------+
###mapped_file_2.csv
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+--------+---+---+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
|variant_name|variant_id|chromosome| genes| variant_type|description|polyphen_prediction|sift_prediction| hgvs|assembly|assembly.date| start| end|ref|alt|risk_allele| phenotype|clinical_actionability|classification|clinical_significance| method| assertion_criteria| level_certainty| date| author| origin|title|year|authors|pmid|is_gwas| name| url|version|databanks.variant_id|clinvar_accession|
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+--------+---+---+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874|11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...| literature only| null|no assertion crit...|2019-07-02| GeneReviews|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874|11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...|clinical testing|Invitae Variant C...|criteria provided...|2019-08-15| Invitae|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874|11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...| literature only| null|no assertion crit...|2019-07-02| GeneReviews|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+--------+---+---+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
从前面的dataframes/files可以看出,有两个dataframes/files中都不存在的列。 我这样做了:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from functools import reduce
import pyspark.sql.functions as F
warehouse_location ='hdfs://hdfs-nn:9000'
spark = SparkSession \
.builder \
.master("local[2]") \
.appName("csv") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.getOrCreate()
all_data = spark.read.options(header='True', delimiter=';').csv("hdfs://hdfs-nn:9000/mapped_files/*")
all_data.show()
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+----------+----+----+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
|variant_name|variant_id|chromosome| genes| variant_type|description|polyphen_prediction|sift_prediction| hgvs|assembly|assembly.date| start| end| ref| alt|risk_allele| phenotype|clinical_actionability|classification|clinical_significance| method| assertion_criteria| level_certainty| date| author| origin|title|year|authors|pmid|is_gwas| name| url|version|databanks.variant_id|clinvar_accession|
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+----------+----+----+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874| 11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...| literature only| null|no assertion crit...|2019-07-02| GeneReviews|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874| 11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...|clinical testing|Invitae Variant C...|criteria provided...|2019-08-15| Invitae|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874| 11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...| literature only| null|no assertion crit...|2019-07-02| GeneReviews|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| 9| 96369762| 96369762| null| C/T| C/T| T| intergenic| Migraine with aura| null| null| null|rs59270819|null|null| null| null| null| null| null| null| null| null| null| null| null| null|null| null|null| null| null| null| null| null| null|
| 10| 29075768| 29075768| null| G/A/C| G/A/C| A| intergenic| Migraine with aura| null| null| null|rs59495588|null|null| null| null| null| null| null| null| null| null| null| null| null| null|null| null|null| null| null| null| null| null| null|
+------------+----------+----------+----------+--------------------+-----------+-------------------+---------------+--------------------+--------+-------------+--------+----------+----+----+-----------+--------------------+----------------------+--------------+---------------------+----------------+--------------------+--------------------+----------+--------------------+--------+-----+----+-------+----+-------+-------+--------------------+-------+--------------------+-----------------+
only showing top 20 rows
当我使用前面的代码时,公共列的值没有出现在正确的位置(在最后两行中,值不在正确的列中)。
所以,我的问题是:如何使用 pyspark 读取多个 csv 文件并将它们合并在一起(它们可能不具有相同的列)?
简单的方法是将缺失的列添加到两个数据框中并使用联合函数。我更喜欢 unionByName
,所以我将在示例中使用它:
df1 = spark.read.options(header='True', delimiter=';').csv("mapped_file_1.csv")
df2 = spark.read.options(header='True', delimiter=';').csv("mapped_file_2.csv")
united_df = df1.unionByName(df2, allowMissingColumns=True)
allowMissingColumns 将使用 NULL 补全数据框中缺失的列。
如果你有超过 2 个文件,你可以只定义一个函数并使用 reduce 来合并所有数据帧:
def unite_dfs(df1, df2):
return df1.unionByName(df2, allowMissingColumns=True)
list_of_dfs = [df1, df2, df3, df4, df5, df6]
united_df = reduce(unite_dfs, list_of_dfs)
如果清楚,请告诉我。我没有包含导入,因为我只使用了您代码段中的库。如果不清楚我可以编辑。