读入文件并将它们分成两个数据帧(Pyspark,spark-dataframe)
Read in Files and split them into two dataframes (Pyspark, spark-dataframe)
我对 Pyspark(和 Spark)还很陌生,有一个具体的任务要解决,目前我还不知道 :)。
我有一堆文件结构如下:
'File_A.dtx':
## Animal
# Header Start
Name, Type, isMammal
# Body Start
Hasi, Rabbit, yes
Birdi, Bird, no
Cathi, Cat, yes
## House
# Header Start
Street, Number
# Body Start
Main Street, 32
Buchengasse, 11
'File_B.dtx':
## Animal
# Header Start
Name, Type, isMammal
# Body Start
Diddi, Dog, yes
Eli, Elephant, yes
## House
# Header Start
Street, Number
# Body Start
Strauchweg, 13
Igelallee, 22
我的预期结果是两个数据框如下:
动物:
| Filename | Name | Type | isMammal |
| ---------- | ------- | -------- | ----------- |
| File_A.dtx | Hasi | Rabbit | yes |
| File_A.dtx | Birdi | Bird | no |
| File_A.dtx | Cathi | Cat | yes |
| File_B.dtx | Diddi | Dog | yes |
| File_B.dtx | Eli | Elephant | yes |
房子:
| Filename | Street | Number |
| ---------- | ------------ | -------- |
| File_A.dtx | Main Street | 32 |
| File_A.dtx | Buchengasse | 11 |
| File_B.dtx | Strauchweg | 13 |
| File_B.dtx | Igelallee | 22 |
该解决方案应该能够并行工作。它可以按文件工作,因为每个文件都很小(大约 3 MB),但我有很多文件。
非常感谢您的提示。
我目前只有:
from pyspark.sql.functions import input_file_name
df1 = spark.read.text(filelist).withColumn("Filename", input_file_name())
现在我的主要问题是,如何根据行 ## Animal
和 ## House
拆分数据框并将其再次聚合到数据框以完成我的任务?
假设您知道前手的格式,并且没有两个数据帧具有相同的列数。然后您可以执行以下操作:
- 从数据集中删除注释(以
#
开头的行)
- 从数据集中删除 header 行
- 删除空行
- 使用
,
拆分行
- 在步骤 4 中创建
animals_df
作为来自 df 的行的子集,其中拆分后的数组大小等于 3,并将数组值提取为列
- 在步骤 4 中创建
house_df
作为来自 df 的行的子集,其中拆分的数组大小等于 2,并将数组值提取为列
from pyspark.sql.functions import element_at, input_file_name, length, col as c, split, size
filelist = ["File_A.dtx", "File_B.dtx"]
df1 = spark.read.text(filelist).withColumn("Filename", input_file_name())
# STEP 1
comment_removed = df1.filter(~(c("value").startswith("#")))
# STEP 2
header_removed = comment_removed.filter(~(c("value").isin("Name, Type, isMammal", "Street, Number")))
# STEP 3
remove_empty_lines = header_removed.filter(length("value") > 0)
# STEP 4
processed_df = remove_empty_lines.withColumn("value", split("value", ",")).withColumn("Filename", element_at(split("Filename", "/"), -1)).cache()
# STEP 5
animals_df = processed_df.filter(size("value") == 3).selectExpr("Filename", "value[0] as Name", "value[1] as Type", "value[2] as isMammal")
animals_df.show()
"""
+----------+-----+---------+--------+
| Filename| Name| Type|isMammal|
+----------+-----+---------+--------+
|File_A.dtx| Hasi| Rabbit| yes|
|File_A.dtx|Birdi| Bird| no|
|File_A.dtx|Cathi| Cat| yes|
|File_B.dtx|Diddi| Dog| yes|
|File_B.dtx| Eli| Elephant| yes|
+----------+-----+---------+--------+
"""
# STEP 6
house_df = processed_df.filter(size("value") == 2).selectExpr("Filename", "value[0] as Street", "cast(value[1] as int) as Number")
house_df.show()
"""
+----------+-----------+------+
| Filename| Street|Number|
+----------+-----------+------+
|File_A.dtx|Main Street| 32|
|File_A.dtx|Buchengasse| 11|
|File_B.dtx| Strauchweg| 13|
|File_B.dtx| Igelallee| 22|
+----------+-----------+------+
"""
我对 Pyspark(和 Spark)还很陌生,有一个具体的任务要解决,目前我还不知道 :)。
我有一堆文件结构如下:
'File_A.dtx':
## Animal
# Header Start
Name, Type, isMammal
# Body Start
Hasi, Rabbit, yes
Birdi, Bird, no
Cathi, Cat, yes
## House
# Header Start
Street, Number
# Body Start
Main Street, 32
Buchengasse, 11
'File_B.dtx':
## Animal
# Header Start
Name, Type, isMammal
# Body Start
Diddi, Dog, yes
Eli, Elephant, yes
## House
# Header Start
Street, Number
# Body Start
Strauchweg, 13
Igelallee, 22
我的预期结果是两个数据框如下:
动物:
| Filename | Name | Type | isMammal |
| ---------- | ------- | -------- | ----------- |
| File_A.dtx | Hasi | Rabbit | yes |
| File_A.dtx | Birdi | Bird | no |
| File_A.dtx | Cathi | Cat | yes |
| File_B.dtx | Diddi | Dog | yes |
| File_B.dtx | Eli | Elephant | yes |
房子:
| Filename | Street | Number |
| ---------- | ------------ | -------- |
| File_A.dtx | Main Street | 32 |
| File_A.dtx | Buchengasse | 11 |
| File_B.dtx | Strauchweg | 13 |
| File_B.dtx | Igelallee | 22 |
该解决方案应该能够并行工作。它可以按文件工作,因为每个文件都很小(大约 3 MB),但我有很多文件。
非常感谢您的提示。
我目前只有:
from pyspark.sql.functions import input_file_name
df1 = spark.read.text(filelist).withColumn("Filename", input_file_name())
现在我的主要问题是,如何根据行 ## Animal
和 ## House
拆分数据框并将其再次聚合到数据框以完成我的任务?
假设您知道前手的格式,并且没有两个数据帧具有相同的列数。然后您可以执行以下操作:
- 从数据集中删除注释(以
#
开头的行) - 从数据集中删除 header 行
- 删除空行
- 使用
,
拆分行
- 在步骤 4 中创建
animals_df
作为来自 df 的行的子集,其中拆分后的数组大小等于 3,并将数组值提取为列 - 在步骤 4 中创建
house_df
作为来自 df 的行的子集,其中拆分的数组大小等于 2,并将数组值提取为列
from pyspark.sql.functions import element_at, input_file_name, length, col as c, split, size
filelist = ["File_A.dtx", "File_B.dtx"]
df1 = spark.read.text(filelist).withColumn("Filename", input_file_name())
# STEP 1
comment_removed = df1.filter(~(c("value").startswith("#")))
# STEP 2
header_removed = comment_removed.filter(~(c("value").isin("Name, Type, isMammal", "Street, Number")))
# STEP 3
remove_empty_lines = header_removed.filter(length("value") > 0)
# STEP 4
processed_df = remove_empty_lines.withColumn("value", split("value", ",")).withColumn("Filename", element_at(split("Filename", "/"), -1)).cache()
# STEP 5
animals_df = processed_df.filter(size("value") == 3).selectExpr("Filename", "value[0] as Name", "value[1] as Type", "value[2] as isMammal")
animals_df.show()
"""
+----------+-----+---------+--------+
| Filename| Name| Type|isMammal|
+----------+-----+---------+--------+
|File_A.dtx| Hasi| Rabbit| yes|
|File_A.dtx|Birdi| Bird| no|
|File_A.dtx|Cathi| Cat| yes|
|File_B.dtx|Diddi| Dog| yes|
|File_B.dtx| Eli| Elephant| yes|
+----------+-----+---------+--------+
"""
# STEP 6
house_df = processed_df.filter(size("value") == 2).selectExpr("Filename", "value[0] as Street", "cast(value[1] as int) as Number")
house_df.show()
"""
+----------+-----------+------+
| Filename| Street|Number|
+----------+-----------+------+
|File_A.dtx|Main Street| 32|
|File_A.dtx|Buchengasse| 11|
|File_B.dtx| Strauchweg| 13|
|File_B.dtx| Igelallee| 22|
+----------+-----------+------+
"""