将 pandas 操作转换为 SPARK 操作
Converting a pandas operation into a SPARK operation
我对 python 中的 SPARK 操作有疑问。
所以我的问题格式很好,并包含在以下字母中:
https://cernbox.cern.ch/index.php/s/W3zWvparRP2WGJc
它是关于如何在 SPARK 数据帧上执行在 pandas 数据帧上完美运行的操作。
基本给出功能:
def split(arr, size):
arrs = []
while len(arr) > size:
pice = arr[:size]
arrs.append(pice)
arr = arr[size:]
arrs.append(arr)
return arrs
这个细胞在 SPARK 中的等价物是什么:
df_list = []
for i in range (0,len(p_df.index)):
ars = split(p_df.iloc[i][0]['elements'], 1024)
final_df = pd.DataFrame(ars)
final_df.insert(0, 'timestamp', p_df.iloc[i][1])
time = p_df.iloc[i][1]
magCurr = m_df.iloc[(m_df['__record_timestamp__']-time).abs().argsort()[:2]].value.mean()
final_df.insert(1, 'magnetcurrent', round(magCurr))
final_df.insert(2, 'cycle', range(0,90))
df_list.append(final_df)
all_profiles = pd.concat(df_list, ignore_index=True)
?
您可能会猜到 python 解决方案太慢且内存效率低下无法用于我的所有数据,但我只是不知道如何充分利用 SPARK 来转换此 pandas 一个火花的操作。
我不需要解决方案,但向我指出一些基本上与我在这里做的事情相同的功能会很棒。 TIA.
这是解决方案:
SPLIT_COUNT = 90
SPLIT_SIZE = 1024
spark_p = data.select("profiles", '__record_timestamp__')
spark_p = spark_p.withColumn("profiles", F.col("profiles").getField("elements") )
slices = [F.slice(F.col('profiles'), i * SPLIT_SIZE + 1, SPLIT_SIZE) for i in range(SPLIT_COUNT)]
spark_p = spark_p.select(F.posexplode(F.array(*slices)), F.col('__record_timestamp__'))
spark_p = spark_p.withColumn("cycle", F.col("pos") )
spark_p = spark_p.withColumn("profiles", F.col("col") )
spark_p = spark_p.drop('pos').drop('col')
spark_m = magnetData.select("value", '__record_timestamp__', )
spark_p = spark_p.withColumn('value', F.lit(None))
spark_m = spark_m.withColumn('profiles', F.lit(None))
spark_m = spark_m.withColumn('cycle', F.lit(None))
final_df = spark_p.unionByName(spark_m)
w = Window.orderBy('__record_timestamp__').rowsBetween(Window.unboundedPreceding, -1)
final_df = final_df.withColumn('value', F.last('value', True).over(w)).filter(~F.isnull('profiles'))
我对 python 中的 SPARK 操作有疑问。
所以我的问题格式很好,并包含在以下字母中:
https://cernbox.cern.ch/index.php/s/W3zWvparRP2WGJc
它是关于如何在 SPARK 数据帧上执行在 pandas 数据帧上完美运行的操作。
基本给出功能:
def split(arr, size):
arrs = []
while len(arr) > size:
pice = arr[:size]
arrs.append(pice)
arr = arr[size:]
arrs.append(arr)
return arrs
这个细胞在 SPARK 中的等价物是什么:
df_list = []
for i in range (0,len(p_df.index)):
ars = split(p_df.iloc[i][0]['elements'], 1024)
final_df = pd.DataFrame(ars)
final_df.insert(0, 'timestamp', p_df.iloc[i][1])
time = p_df.iloc[i][1]
magCurr = m_df.iloc[(m_df['__record_timestamp__']-time).abs().argsort()[:2]].value.mean()
final_df.insert(1, 'magnetcurrent', round(magCurr))
final_df.insert(2, 'cycle', range(0,90))
df_list.append(final_df)
all_profiles = pd.concat(df_list, ignore_index=True)
?
您可能会猜到 python 解决方案太慢且内存效率低下无法用于我的所有数据,但我只是不知道如何充分利用 SPARK 来转换此 pandas 一个火花的操作。
我不需要解决方案,但向我指出一些基本上与我在这里做的事情相同的功能会很棒。 TIA.
这是解决方案:
SPLIT_COUNT = 90
SPLIT_SIZE = 1024
spark_p = data.select("profiles", '__record_timestamp__')
spark_p = spark_p.withColumn("profiles", F.col("profiles").getField("elements") )
slices = [F.slice(F.col('profiles'), i * SPLIT_SIZE + 1, SPLIT_SIZE) for i in range(SPLIT_COUNT)]
spark_p = spark_p.select(F.posexplode(F.array(*slices)), F.col('__record_timestamp__'))
spark_p = spark_p.withColumn("cycle", F.col("pos") )
spark_p = spark_p.withColumn("profiles", F.col("col") )
spark_p = spark_p.drop('pos').drop('col')
spark_m = magnetData.select("value", '__record_timestamp__', )
spark_p = spark_p.withColumn('value', F.lit(None))
spark_m = spark_m.withColumn('profiles', F.lit(None))
spark_m = spark_m.withColumn('cycle', F.lit(None))
final_df = spark_p.unionByName(spark_m)
w = Window.orderBy('__record_timestamp__').rowsBetween(Window.unboundedPreceding, -1)
final_df = final_df.withColumn('value', F.last('value', True).over(w)).filter(~F.isnull('profiles'))