在 pyspark 中预处理奇怪的数据
Preprocessing weird data in pyspark
我正在处理一组分布非常奇怪且难以处理的气候数据。我决定使用 pyspark,因为它的数据量很大,你知道,我的想法是节省时间。
数据格式为.ascii/.text/.dat,随便你怎么称呼,分布如下:
Date 1
Value 1
Value 2
Value 3
Value 4
Value 5
Value 6
Value 7
Value 8
Value 9
Value 10
Value 11
Value 12
.
.
.
.
.
Value 101178
Date 2
Value 1
Value 2
Value 3
Value 4
Value 5
Value 6
Value 7
Value 8
Value 9
Value 10
Value 11
Value 12
.
.
.
.
.
Value 101178
即是由table条101178条数据分布在6列(16863行)组成的tabletable。
如果解释的不是很清楚,我附上一个link到文件的一小段。 (原始文件>50GB)
https://drive.google.com/file/d/1-aJRTWzpQ5lHyZgt-h7DuEY5GpYZRcUh/view?usp=sharing
我的想法是生成一个具有以下结构的矩阵:
Date 1
Date 2
Date n
Value 1
Value1.2
Value1.n
Value 2
Value2.2
Value2.n
Value n
Valuen.2
Valuen.n
我已经尽量把问题说清楚了。正如我所说,我正在使用 pyspark,所以如果有人有任何解决方案来使用此工具进行此数据处理,我将不胜感激。
非常感谢大家!
在你发表评论后,我修改了我的答案,并不是说我使用了 'pandas' 和他们的数据框,如果需要,这应该直接转化为 spark。
此外,我认为您的数据已损坏,因为最后一个数组的长度不正确我的代码无法处理此问题,因此您需要使用从正则表达式 expected_values = m.group(4)
等
警告::=
运算符需要 Python 3.8...但如果需要,您可以修复它
备注:
- 每个 'section' 的 header 被正则表达式捕获并用于形成列名
在日期行拆分文件:
import pandas as pd
import numpy as np
import re
from pathlib import Path
header = re.compile(r"^\s+(\d{10})\s+(\d*)\s+(\d*)\s+(\d*)$")
df = pd.DataFrame()
with open("t2.dat", "r") as ifp:
rows = []
date = None
count = 0
while line := ifp.readline():
# Get the header and start a new file
if m := header.match(line):
# We have a header so convert to array then flatten to a vector
# before appending to the dataframe.
if rows and date:
df[date] = np.array(rows, dtype=float).flatten(order="C")
rows = []
# Get the header
date = m.group(1)
else:
rows.append(line.strip().split())
print(f"Appending the last {len(rows)*len(rows[0])} values")
df[date] = np.array(rows, dtype=float).flatten(order="C")
输出一个缩写形式(每个日期1列101178行:
1990010100 1990010101 1990010102 1990010103 1990010104
0 10.4310 10.0490 9.7269 9.3801 9.0038
1 10.3110 9.9225 9.5431 9.1758 8.7899
2 10.2290 9.8144 9.4156 9.0304 8.6171
3 10.1500 9.7154 9.2999 8.8890 8.4713
4 9.8586 9.3968 8.9156 8.4328 7.9764
... ... ... ... ... ...
101173 -1.5511 -1.5472 -1.5433 -1.5251 -1.5399
101174 -1.8659 -1.8719 -1.8485 -1.8481 -1.8325
101175 -1.9044 -1.8597 -1.7963 -1.8094 -1.7653
101176 -2.0564 -2.0404 -1.9779 -1.9893 -1.9521
101177 -2.1842 -2.2840 -2.3216 -2.2794 -2.2655
我设法让它非常接近但不完全符合您预期的数据帧结构。
为了测试代码的输出,我制作了这个虚拟数据集来玩,因为很难在原始数据集中跟踪大量数字
+--------------------------------------------+
|value |
+--------------------------------------------+
| 1990010100 0 0 24|
| 001 002 003 004 005 006 |
| 007 008 009 010 011 012 |
| 013 014 015 016 017 018 |
| 019 020 021 022 023 024 |
| 1990010101 0 0 24|
| 101 102 103 104 105 106 |
| 107 108 109 110 111 112 |
| 113 114 115 116 117 118 |
| 119 120 121 122 123 124 |
| 1990010102 0 0 24|
| 201 202 203 204 205 206 |
| 207 208 209 210 211 212 |
| 213 214 215 216 217 218 |
| 219 220 221 222 223 224 |
+--------------------------------------------+
这是我测试过的完整代码。主要思想是以某种方式标记日期块及其记录,以便它们可以相互连接。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window as W
# I'm not sure what cluster do you have, or you're going to run on your local machine, so I added this piece here for your reference. The most important thing is you would want to provide enough resources to handle entire dataset all at once.
spark = (SparkSession
.builder
.master('local[*]')
.appName('SO')
.config('spark.driver.cores', '4')
.config('spark.driver.memory', '4g')
.config('spark.executor.cores', '4')
.config('spark.executor.memory', '4g')
.getOrCreate()
)
# load raw weird data and apply some transformation to make it "readable" or "processable"
df = (spark
.read.text('t2.dat')
.withColumn('id', F.row_number().over(W.orderBy(F.lit(1)))) # making ID per row, very important to mark the dates
.withColumn('value', F.trim(F.col('value'))) # trim spaces before and after
.withColumn('value', F.split(F.col('value'), '\s+')) # turn single line to individual values
)
# +------------------------------+---+
# |value |id |
# +------------------------------+---+
# |[1990010100, 0, 0, 24] |1 |
# |[001, 002, 003, 004, 005, 006]|2 |
# |[007, 008, 009, 010, 011, 012]|3 |
# |[013, 014, 015, 016, 017, 018]|4 |
# |[019, 020, 021, 022, 023, 024]|5 |
# |[1990010101, 0, 0, 24] |6 |
# |[101, 102, 103, 104, 105, 106]|7 |
# |[107, 108, 109, 110, 111, 112]|8 |
# |[113, 114, 115, 116, 117, 118]|9 |
# |[119, 120, 121, 122, 123, 124]|10 |
# |[1990010102, 0, 0, 24] |11 |
# |[201, 202, 203, 204, 205, 206]|12 |
# |[207, 208, 209, 210, 211, 212]|13 |
# |[213, 214, 215, 216, 217, 218]|14 |
# |[219, 220, 221, 222, 223, 224]|15 |
# +------------------------------+---+
# Extracting available date blocks
date_df = (df
.where(F.size('value') == 4)
.withColumn('grp', ((F.col('id') - 1) / 5).cast('int')) # replace 5 with 16864 when run with your actual dataset
.select('grp', F.col('value')[0].alias('date'))
)
date_df.show(10, False)
# +---+----------+
# |grp|date |
# +---+----------+
# |0 |1990010100|
# |0 |1990010101|
# |0 |1990010102|
# +---+----------+
# Extracting available value blocks
value_df = (df
.where(F.size('value') == 6)
.withColumn('grp', ((F.col('id') - 1) / 5).cast('int')) # replace 5 with 16864 when run with your actual dataset
.groupBy('grp')
.agg(F.collect_list('value').alias('value'))
.withColumn('value', F.flatten('value'))
)
# +---+------------------------------------------------------------------------------------------------------------------------+
# |grp|value |
# +---+------------------------------------------------------------------------------------------------------------------------+
# |0 |[001, 002, 003, 004, 005, 006, 007, 008, 009, 010, 011, 012, 013, 014, 015, 016, 017, 018, 019, 020, 021, 022, 023, 024]|
# |1 |[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124]|
# |2 |[201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224]|
# +---+------------------------------------------------------------------------------------------------------------------------+
# join them together and "explode" array to different rows
joined_df = (date_df
.join(value_df, on=['grp'])
.withColumn('value', F.explode('value'))
)
# +---+----------+-----+
# |grp|date |value|
# +---+----------+-----+
# |0 |1990010100|001 |
# |0 |1990010100|002 |
# |0 |... |... |
# |0 |1990010100|023 |
# |0 |1990010100|024 |
# |1 |1990010101|101 |
# |1 |1990010101|102 |
# |1 |... |... |
# |1 |1990010101|123 |
# |1 |1990010101|124 |
# |2 |1990010102|201 |
# |2 |1990010102|202 |
# |2 |... |... |
# |2 |1990010102|223 |
# |2 |1990010102|224 |
# +---+----------+-----+
# now, joined_df is basically holding your entire dataset, it's totally up to you how do you want to handle it.
# One option is you can save each date as a partition of one Hive table.
# Another option is saving each date as a file.
# It's just for the sake of simplicity when you'll ever need to read that painful dataset again.
for date in [row['date'] for row in date_df.collect()]:
(joined_df
.where(F.col('date') == date)
.write
.mode('overwrite')
.csv(date)
)
我正在处理一组分布非常奇怪且难以处理的气候数据。我决定使用 pyspark,因为它的数据量很大,你知道,我的想法是节省时间。
数据格式为.ascii/.text/.dat,随便你怎么称呼,分布如下:
Date 1 | |||||
---|---|---|---|---|---|
Value 1 | Value 2 | Value 3 | Value 4 | Value 5 | Value 6 |
Value 7 | Value 8 | Value 9 | Value 10 | Value 11 | Value 12 |
. | . | . | . | . | Value 101178 |
Date 2 | |||||
Value 1 | Value 2 | Value 3 | Value 4 | Value 5 | Value 6 |
Value 7 | Value 8 | Value 9 | Value 10 | Value 11 | Value 12 |
. | . | . | . | . | Value 101178 |
即是由table条101178条数据分布在6列(16863行)组成的tabletable。
如果解释的不是很清楚,我附上一个link到文件的一小段。 (原始文件>50GB)
https://drive.google.com/file/d/1-aJRTWzpQ5lHyZgt-h7DuEY5GpYZRcUh/view?usp=sharing
我的想法是生成一个具有以下结构的矩阵:
Date 1 | Date 2 | Date n |
---|---|---|
Value 1 | Value1.2 | Value1.n |
Value 2 | Value2.2 | Value2.n |
Value n | Valuen.2 | Valuen.n |
我已经尽量把问题说清楚了。正如我所说,我正在使用 pyspark,所以如果有人有任何解决方案来使用此工具进行此数据处理,我将不胜感激。
非常感谢大家!
在你发表评论后,我修改了我的答案,并不是说我使用了 'pandas' 和他们的数据框,如果需要,这应该直接转化为 spark。
此外,我认为您的数据已损坏,因为最后一个数组的长度不正确我的代码无法处理此问题,因此您需要使用从正则表达式 expected_values = m.group(4)
等
警告::=
运算符需要 Python 3.8...但如果需要,您可以修复它
备注:
- 每个 'section' 的 header 被正则表达式捕获并用于形成列名
在日期行拆分文件:
import pandas as pd
import numpy as np
import re
from pathlib import Path
header = re.compile(r"^\s+(\d{10})\s+(\d*)\s+(\d*)\s+(\d*)$")
df = pd.DataFrame()
with open("t2.dat", "r") as ifp:
rows = []
date = None
count = 0
while line := ifp.readline():
# Get the header and start a new file
if m := header.match(line):
# We have a header so convert to array then flatten to a vector
# before appending to the dataframe.
if rows and date:
df[date] = np.array(rows, dtype=float).flatten(order="C")
rows = []
# Get the header
date = m.group(1)
else:
rows.append(line.strip().split())
print(f"Appending the last {len(rows)*len(rows[0])} values")
df[date] = np.array(rows, dtype=float).flatten(order="C")
输出一个缩写形式(每个日期1列101178行:
1990010100 1990010101 1990010102 1990010103 1990010104
0 10.4310 10.0490 9.7269 9.3801 9.0038
1 10.3110 9.9225 9.5431 9.1758 8.7899
2 10.2290 9.8144 9.4156 9.0304 8.6171
3 10.1500 9.7154 9.2999 8.8890 8.4713
4 9.8586 9.3968 8.9156 8.4328 7.9764
... ... ... ... ... ...
101173 -1.5511 -1.5472 -1.5433 -1.5251 -1.5399
101174 -1.8659 -1.8719 -1.8485 -1.8481 -1.8325
101175 -1.9044 -1.8597 -1.7963 -1.8094 -1.7653
101176 -2.0564 -2.0404 -1.9779 -1.9893 -1.9521
101177 -2.1842 -2.2840 -2.3216 -2.2794 -2.2655
我设法让它非常接近但不完全符合您预期的数据帧结构。
为了测试代码的输出,我制作了这个虚拟数据集来玩,因为很难在原始数据集中跟踪大量数字
+--------------------------------------------+
|value |
+--------------------------------------------+
| 1990010100 0 0 24|
| 001 002 003 004 005 006 |
| 007 008 009 010 011 012 |
| 013 014 015 016 017 018 |
| 019 020 021 022 023 024 |
| 1990010101 0 0 24|
| 101 102 103 104 105 106 |
| 107 108 109 110 111 112 |
| 113 114 115 116 117 118 |
| 119 120 121 122 123 124 |
| 1990010102 0 0 24|
| 201 202 203 204 205 206 |
| 207 208 209 210 211 212 |
| 213 214 215 216 217 218 |
| 219 220 221 222 223 224 |
+--------------------------------------------+
这是我测试过的完整代码。主要思想是以某种方式标记日期块及其记录,以便它们可以相互连接。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window as W
# I'm not sure what cluster do you have, or you're going to run on your local machine, so I added this piece here for your reference. The most important thing is you would want to provide enough resources to handle entire dataset all at once.
spark = (SparkSession
.builder
.master('local[*]')
.appName('SO')
.config('spark.driver.cores', '4')
.config('spark.driver.memory', '4g')
.config('spark.executor.cores', '4')
.config('spark.executor.memory', '4g')
.getOrCreate()
)
# load raw weird data and apply some transformation to make it "readable" or "processable"
df = (spark
.read.text('t2.dat')
.withColumn('id', F.row_number().over(W.orderBy(F.lit(1)))) # making ID per row, very important to mark the dates
.withColumn('value', F.trim(F.col('value'))) # trim spaces before and after
.withColumn('value', F.split(F.col('value'), '\s+')) # turn single line to individual values
)
# +------------------------------+---+
# |value |id |
# +------------------------------+---+
# |[1990010100, 0, 0, 24] |1 |
# |[001, 002, 003, 004, 005, 006]|2 |
# |[007, 008, 009, 010, 011, 012]|3 |
# |[013, 014, 015, 016, 017, 018]|4 |
# |[019, 020, 021, 022, 023, 024]|5 |
# |[1990010101, 0, 0, 24] |6 |
# |[101, 102, 103, 104, 105, 106]|7 |
# |[107, 108, 109, 110, 111, 112]|8 |
# |[113, 114, 115, 116, 117, 118]|9 |
# |[119, 120, 121, 122, 123, 124]|10 |
# |[1990010102, 0, 0, 24] |11 |
# |[201, 202, 203, 204, 205, 206]|12 |
# |[207, 208, 209, 210, 211, 212]|13 |
# |[213, 214, 215, 216, 217, 218]|14 |
# |[219, 220, 221, 222, 223, 224]|15 |
# +------------------------------+---+
# Extracting available date blocks
date_df = (df
.where(F.size('value') == 4)
.withColumn('grp', ((F.col('id') - 1) / 5).cast('int')) # replace 5 with 16864 when run with your actual dataset
.select('grp', F.col('value')[0].alias('date'))
)
date_df.show(10, False)
# +---+----------+
# |grp|date |
# +---+----------+
# |0 |1990010100|
# |0 |1990010101|
# |0 |1990010102|
# +---+----------+
# Extracting available value blocks
value_df = (df
.where(F.size('value') == 6)
.withColumn('grp', ((F.col('id') - 1) / 5).cast('int')) # replace 5 with 16864 when run with your actual dataset
.groupBy('grp')
.agg(F.collect_list('value').alias('value'))
.withColumn('value', F.flatten('value'))
)
# +---+------------------------------------------------------------------------------------------------------------------------+
# |grp|value |
# +---+------------------------------------------------------------------------------------------------------------------------+
# |0 |[001, 002, 003, 004, 005, 006, 007, 008, 009, 010, 011, 012, 013, 014, 015, 016, 017, 018, 019, 020, 021, 022, 023, 024]|
# |1 |[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124]|
# |2 |[201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224]|
# +---+------------------------------------------------------------------------------------------------------------------------+
# join them together and "explode" array to different rows
joined_df = (date_df
.join(value_df, on=['grp'])
.withColumn('value', F.explode('value'))
)
# +---+----------+-----+
# |grp|date |value|
# +---+----------+-----+
# |0 |1990010100|001 |
# |0 |1990010100|002 |
# |0 |... |... |
# |0 |1990010100|023 |
# |0 |1990010100|024 |
# |1 |1990010101|101 |
# |1 |1990010101|102 |
# |1 |... |... |
# |1 |1990010101|123 |
# |1 |1990010101|124 |
# |2 |1990010102|201 |
# |2 |1990010102|202 |
# |2 |... |... |
# |2 |1990010102|223 |
# |2 |1990010102|224 |
# +---+----------+-----+
# now, joined_df is basically holding your entire dataset, it's totally up to you how do you want to handle it.
# One option is you can save each date as a partition of one Hive table.
# Another option is saving each date as a file.
# It's just for the sake of simplicity when you'll ever need to read that painful dataset again.
for date in [row['date'] for row in date_df.collect()]:
(joined_df
.where(F.col('date') == date)
.write
.mode('overwrite')
.csv(date)
)