在 pyspark 中预处理奇怪的数据

Preprocessing weird data in pyspark

我正在处理一组分布非常奇怪且难以处理的气候数据。我决定使用 pyspark,因为它的数据量很大,你知道,我的想法是节省时间。

数据格式为.ascii/.text/.dat,随便你怎么称呼,分布如下:


Date 1
Value 1 Value 2 Value 3 Value 4 Value 5 Value 6
Value 7 Value 8 Value 9 Value 10 Value 11 Value 12
. . . . . Value 101178
Date 2
Value 1 Value 2 Value 3 Value 4 Value 5 Value 6
Value 7 Value 8 Value 9 Value 10 Value 11 Value 12
. . . . . Value 101178

即是由table条101178条数据分布在6列(16863行)组成的tabletable。

如果解释的不是很清楚,我附上一个link到文件的一小段。 (原始文件>50GB)

https://drive.google.com/file/d/1-aJRTWzpQ5lHyZgt-h7DuEY5GpYZRcUh/view?usp=sharing

我的想法是生成一个具有以下结构的矩阵:


Date 1 Date 2 Date n
Value 1 Value1.2 Value1.n
Value 2 Value2.2 Value2.n
Value n Valuen.2 Valuen.n

我已经尽量把问题说清楚了。正如我所说,我正在使用 pyspark,所以如果有人有任何解决方案来使用此工具进行此数据处理,我将不胜感激。

非常感谢大家!

在你发表评论后,我修改了我的答案,并不是说我使用了 'pandas' 和他们的数据框,如果需要,这应该直接转化为 spark。

此外,我认为您的数据已损坏,因为最后一个数组的长度不正确我的代码无法处理此问题,因此您需要使用从正则表达式 expected_values = m.group(4)

警告::= 运算符需要 Python 3.8...但如果需要,您可以修复它

备注:

  • 每个 'section' 的 header 被正则表达式捕获并用于形成列名

在日期行拆分文件:

import pandas as pd
import numpy as np
import re
from pathlib import Path

header = re.compile(r"^\s+(\d{10})\s+(\d*)\s+(\d*)\s+(\d*)$")

df = pd.DataFrame()

with open("t2.dat", "r") as ifp:
    rows = []
    date = None
    count = 0
    
    while line := ifp.readline():
        # Get the header and start a new file
        if m := header.match(line):
            # We have a header so convert to array then flatten to a vector
            # before appending to the dataframe.
            if rows and date:
                df[date] = np.array(rows, dtype=float).flatten(order="C")
                rows = []
                
            # Get the header
            date = m.group(1)
            
        else:
            rows.append(line.strip().split())
    
    print(f"Appending the last {len(rows)*len(rows[0])} values")
    df[date] = np.array(rows, dtype=float).flatten(order="C")

输出一个缩写形式(每个日期1列101178行:

    1990010100  1990010101  1990010102  1990010103  1990010104
0   10.4310 10.0490 9.7269  9.3801  9.0038
1   10.3110 9.9225  9.5431  9.1758  8.7899
2   10.2290 9.8144  9.4156  9.0304  8.6171
3   10.1500 9.7154  9.2999  8.8890  8.4713
4   9.8586  9.3968  8.9156  8.4328  7.9764
... ... ... ... ... ...
101173  -1.5511 -1.5472 -1.5433 -1.5251 -1.5399
101174  -1.8659 -1.8719 -1.8485 -1.8481 -1.8325
101175  -1.9044 -1.8597 -1.7963 -1.8094 -1.7653
101176  -2.0564 -2.0404 -1.9779 -1.9893 -1.9521
101177  -2.1842 -2.2840 -2.3216 -2.2794 -2.2655

我设法让它非常接近但不完全符合您预期的数据帧结构。

为了测试代码的输出,我制作了这个虚拟数据集来玩,因为很难在原始数据集中跟踪大量数字

+--------------------------------------------+
|value                                       |
+--------------------------------------------+
|  1990010100           0           0      24|
|  001  002  003  004  005  006              |
|  007  008  009  010  011  012              |
|  013  014  015  016  017  018              |
|  019  020  021  022  023  024              |
|  1990010101           0           0      24|
|  101  102  103  104  105  106              |
|  107  108  109  110  111  112              |
|  113  114  115  116  117  118              |
|  119  120  121  122  123  124              |
|  1990010102           0           0      24|
|  201  202  203  204  205  206              |
|  207  208  209  210  211  212              |
|  213  214  215  216  217  218              |
|  219  220  221  222  223  224              |
+--------------------------------------------+

这是我测试过的完整代码。主要思想是以某种方式标记日期块及其记录,以便它们可以相互连接。

from pyspark.sql import SparkSession

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window as W

# I'm not sure what cluster do you have, or you're going to run on your local machine, so I added this piece here for your reference. The most important thing is you would want to provide enough resources to handle entire dataset all at once.
spark = (SparkSession
    .builder
    .master('local[*]')
    .appName('SO')
    .config('spark.driver.cores', '4')
    .config('spark.driver.memory', '4g')
    .config('spark.executor.cores', '4')
    .config('spark.executor.memory', '4g')
    .getOrCreate()
)

# load raw weird data and apply some transformation to make it "readable" or "processable"
df = (spark
    .read.text('t2.dat')
    .withColumn('id', F.row_number().over(W.orderBy(F.lit(1)))) # making ID per row, very important to mark the dates
    .withColumn('value', F.trim(F.col('value')))         # trim spaces before and after
    .withColumn('value', F.split(F.col('value'), '\s+')) # turn single line to individual values
)
# +------------------------------+---+
# |value                         |id |
# +------------------------------+---+
# |[1990010100, 0, 0, 24]        |1  |
# |[001, 002, 003, 004, 005, 006]|2  |
# |[007, 008, 009, 010, 011, 012]|3  |
# |[013, 014, 015, 016, 017, 018]|4  |
# |[019, 020, 021, 022, 023, 024]|5  |
# |[1990010101, 0, 0, 24]        |6  |
# |[101, 102, 103, 104, 105, 106]|7  |
# |[107, 108, 109, 110, 111, 112]|8  |
# |[113, 114, 115, 116, 117, 118]|9  |
# |[119, 120, 121, 122, 123, 124]|10 |
# |[1990010102, 0, 0, 24]        |11 |
# |[201, 202, 203, 204, 205, 206]|12 |
# |[207, 208, 209, 210, 211, 212]|13 |
# |[213, 214, 215, 216, 217, 218]|14 |
# |[219, 220, 221, 222, 223, 224]|15 |
# +------------------------------+---+

# Extracting available date blocks
date_df = (df
    .where(F.size('value') == 4)
    .withColumn('grp', ((F.col('id') - 1) / 5).cast('int')) # replace 5 with 16864 when run with your actual dataset
    .select('grp', F.col('value')[0].alias('date'))
)
date_df.show(10, False)
# +---+----------+
# |grp|date      |
# +---+----------+
# |0  |1990010100|
# |0  |1990010101|
# |0  |1990010102|
# +---+----------+

# Extracting available value blocks
value_df = (df
    .where(F.size('value') == 6)
    .withColumn('grp', ((F.col('id') - 1) / 5).cast('int')) # replace 5 with 16864 when run with your actual dataset
    .groupBy('grp')
    .agg(F.collect_list('value').alias('value'))
    .withColumn('value', F.flatten('value'))
)
# +---+------------------------------------------------------------------------------------------------------------------------+
# |grp|value                                                                                                                   |
# +---+------------------------------------------------------------------------------------------------------------------------+
# |0  |[001, 002, 003, 004, 005, 006, 007, 008, 009, 010, 011, 012, 013, 014, 015, 016, 017, 018, 019, 020, 021, 022, 023, 024]|
# |1  |[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124]|
# |2  |[201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224]|
# +---+------------------------------------------------------------------------------------------------------------------------+

# join them together and "explode" array to different rows
joined_df = (date_df
    .join(value_df, on=['grp'])
    .withColumn('value', F.explode('value'))
)
# +---+----------+-----+
# |grp|date      |value|
# +---+----------+-----+
# |0  |1990010100|001  |
# |0  |1990010100|002  |
# |0  |...       |...  |
# |0  |1990010100|023  |
# |0  |1990010100|024  |
# |1  |1990010101|101  |
# |1  |1990010101|102  |
# |1  |...       |...  |
# |1  |1990010101|123  |
# |1  |1990010101|124  |
# |2  |1990010102|201  |
# |2  |1990010102|202  |
# |2  |...       |...  |
# |2  |1990010102|223  |
# |2  |1990010102|224  |
# +---+----------+-----+

# now, joined_df is basically holding your entire dataset, it's totally up to you how do you want to handle it.
# One option is you can save each date as a partition of one Hive table.
# Another option is saving each date as a file.
# It's just for the sake of simplicity when you'll ever need to read that painful dataset again.
for date in [row['date'] for row in date_df.collect()]:
    (joined_df
        .where(F.col('date') == date)
        .write
        .mode('overwrite')
        .csv(date)
    )