将 pandas 操作转换为 SPARK 操作

Converting a pandas operation into a SPARK operation

我对 python 中的 SPARK 操作有疑问。

所以我的问题格式很好,并包含在以下字母中:

https://cernbox.cern.ch/index.php/s/W3zWvparRP2WGJc

它是关于如何在 SPARK 数据帧上执行在 pandas 数据帧上完美运行的操作。

基本给出功能:

def split(arr, size):
 arrs = []
 while len(arr) > size:
     pice = arr[:size]
     arrs.append(pice)
     arr   = arr[size:]
 arrs.append(arr)
 return arrs

这个细胞在 SPARK 中的等价物是什么:

df_list = []

for i in range (0,len(p_df.index)):
    ars = split(p_df.iloc[i][0]['elements'], 1024)
    final_df = pd.DataFrame(ars)
    final_df.insert(0, 'timestamp', p_df.iloc[i][1])
    time = p_df.iloc[i][1]
    magCurr = m_df.iloc[(m_df['__record_timestamp__']-time).abs().argsort()[:2]].value.mean()
    final_df.insert(1, 'magnetcurrent', round(magCurr))
    final_df.insert(2, 'cycle', range(0,90))
    df_list.append(final_df)


all_profiles = pd.concat(df_list, ignore_index=True)

?

您可能会猜到 python 解决方案太慢且内存效率低下无法用于我的所有数据,但我只是不知道如何充分利用 SPARK 来转换此 pandas 一个火花的操作。

我不需要解决方案,但向我指出一些基本上与我在这里做的事情相同的功能会很棒。 TIA.

这是解决方案:

SPLIT_COUNT = 90
SPLIT_SIZE = 1024

spark_p = data.select("profiles", '__record_timestamp__')
spark_p = spark_p.withColumn("profiles", F.col("profiles").getField("elements") )

slices = [F.slice(F.col('profiles'), i * SPLIT_SIZE + 1, SPLIT_SIZE) for i in range(SPLIT_COUNT)]

spark_p = spark_p.select(F.posexplode(F.array(*slices)), F.col('__record_timestamp__'))
spark_p = spark_p.withColumn("cycle", F.col("pos") )
spark_p = spark_p.withColumn("profiles", F.col("col") )
spark_p = spark_p.drop('pos').drop('col')

spark_m = magnetData.select("value", '__record_timestamp__', )


spark_p = spark_p.withColumn('value', F.lit(None))


spark_m = spark_m.withColumn('profiles', F.lit(None))
spark_m = spark_m.withColumn('cycle', F.lit(None))


final_df = spark_p.unionByName(spark_m)

w = Window.orderBy('__record_timestamp__').rowsBetween(Window.unboundedPreceding, -1)

final_df = final_df.withColumn('value', F.last('value', True).over(w)).filter(~F.isnull('profiles'))