pyspark 转换 DataFrame cols 的子集但保留索引

pyspark transform subset of DataFrame cols but preserve index

我是 spark/pyspark 的新手,我正在尝试将一些 pandas 代码转换为 pyspark。

简而言之,问题是:我如何在保留行索引值的同时按行转换 spark 数据帧的某些数字列。

我有一个数据框,其中有几列用作索引,而其余的是数字数据,我需要对其进行多次转换

   i0 i1        c0        c1        c2
0   0  A  1.764052 -0.977278  0.144044
1   1  B  0.400157  0.950088  1.454274
2   2  C  0.978738 -0.151357  0.761038
3   3  D  2.240893 -0.103219  0.121675
4   4  E  1.867558  0.410599  0.443863

因此列 i0 和 i1 是索引,c0 - c2 是数据。

我想做的是对数字列(按行)应用一些转换,但保留索引信息。

下面我以“逐行减去均值”为例,实际需要做的操作是多种多样的,需要任意函数。我知道你不需要使用函数来减去 spark 数据帧的平均值,我只是在这里使用它作为简化。

设置代码是这样的:

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.master("local").getOrCreate()

def pd_norm(df):
        return df.sub(df.mean(axis=1), axis=0)

def pd_someop(it):
        for df in it:
                yield pd_norm(df)

np.random.seed(0)
nrows = 5
ncols = 3
v = pd.DataFrame({'i0' : range(nrows), 'i1' : [chr(65 + i) for i in range(nrows)]})
v = v.join(pd.DataFrame({'c' + str(x) : np.random.normal(size=nrows) for x in range(ncols)}))
vdf = spark.createDataFrame(v)

我可以运行像

vdf.select([F.col(x) for x in vdf.columns[2:]]).mapInPandas(pd_someop, schema=vdf.schema[2:]).show()

这将应用转换,但不能保证按顺序返回行,所以我不知道如何获取转换后的值及其索引列值。

我无法传递索引列,因为我不想将它们包含在转换计算中。它们可能是日期,或者不是简单整数 index/row 数字的字符串。

在pandas我会做类似

的事情
v.iloc[:,:2].join(pd_norm(v.iloc[:,2:]))

给出

   i0 i1        c0        c1        c2
0   0  A  1.453780 -1.287551 -0.166229
1   1  B -0.534683  0.015249  0.519434
2   2  C  0.449265 -0.680830  0.231565
3   3  D  1.487777 -0.856335 -0.631441
4   4  E  0.960218 -0.496741 -0.463477

即我有带有原始索引的转换数字列。

我也有相当数量的列(10 到 1000 秒),因此我明确硬编码列名的解决方案不可行。

我真的在寻找一种通用的使用模式,其中我有几个构成索引的列,以及需要通过一些任意行式函数转换的数百列。

我考虑过将列元数据添加到 spark 数据帧以指示该列是否是索引,但此元数据没有进入 pandas 函数,因此我无法在那里过滤索引列。

我希望这是清楚的。就像我说的那样,我对 spark 很陌生,所以我不知道我是否只是遗漏了一些明显的东西。谢谢。

你可以改变yield部分如下,改变你调用的方式mapInPandas:

def pd_norm(df):
        return df.sub(df.mean(axis=1), axis=0)

def pd_someop(it):
        for df in it:
                yield df.iloc[:,:2].join(pd_norm(df.iloc[:,2:]))

vdf.mapInPandas(pd_someop, schema=vdf.schema).show()
+---+---+------------------+--------------------+--------------------+
| i0| i1|                c0|                  c1|                  c2|
+---+---+------------------+--------------------+--------------------+
|  0|  A|1.4537796668836203| -1.2875505589604548|-0.16622910792316567|
|  1|  B|-0.534682502584706|0.015248706573660176|  0.5194337960110459|
|  2|  C| 0.449265150454061|  -0.680830041949376| 0.23156489149531523|
|  3|  D|1.4877767445678818|  -0.856335306427134| -0.6314414381407477|
|  4|  E|0.9602180818720457|-0.49674140633954944| -0.4634766755324961|
+---+---+------------------+--------------------+--------------------+