使用 Spark 将列名附加到列值

Appending column name to column value using Spark

我有逗号分隔文件中的数据,我已将其加载到 spark 数据框中: 数据如下:

  A B C
  1 2 3
  4 5 6
  7 8 9

我想使用 pyspark 在 spark 中转换上述数据框:

   A    B   C
  A_1  B_2  C_3
  A_4  B_5  C_6
  --------------

然后使用 pyspark 将其转换为列表列表:

[[ A_1 , B_2 , C_3],[A_4 , B_5 , C_6]]

然后运行在上述数据集上使用pyspark的FP Growth算法。

我试过的代码如下:

from pyspark.sql.functions import col, size
from pyspark.sql.functions import *
import pyspark.sql.functions as func
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import Row
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StringType
from pyspark import SQLContext

sqlContext = SQLContext(sc)
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/data.csv")

 names=df.schema.names

然后我想到在for循环里面做点什么:

 for name in names:
      -----
      ------

之后我将使用 fpgrowth:

df = spark.createDataFrame([
    (0, [ A_1 , B_2 , C_3]),
    (1, [A_4 , B_5 , C_6]),)], ["id", "items"])

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(df)

这里为那些通常使用 Scala 的人展示了如何使用 pyspark 的一些概念。有点不同,但肯定能学到一些东西,尽管学到多少是个大问题。我自己当然通过 zipWithIndex 在 pyspark 上学到了一点。反正。

第一部分是将内容转换为所需的格式,可能也可以导入但保持原样:

from functools import reduce
from pyspark.sql.functions import lower, col, lit, concat, split
from pyspark.sql.types import * 
from pyspark.sql import Row
from pyspark.sql import functions as f

source_df = spark.createDataFrame(
   [
    (1, 11, 111),
    (2, 22, 222)
   ],
   ["colA", "colB", "colC"]
                                 )

intermediate_df = (reduce(
                    lambda df, col_name: df.withColumn(col_name, concat(lit(col_name), lit("_"), col(col_name))),
                    source_df.columns,
                    source_df
                   )     )

allCols = [x for x in intermediate_df.columns]
result_df = intermediate_df.select(f.concat_ws(',', *allCols).alias('CONCAT_COLS'))

result_df = result_df.select(split(col("CONCAT_COLS"), ",\s*").alias("ARRAY_COLS"))

# Add 0,1,2,3, ... with zipWithIndex, we add it at back, but that does not matter, you can move it around.
# Get new Structure, the fields (one in this case but done flexibly, plus zipWithIndex value.
schema = StructType(result_df.schema.fields[:] + [StructField("index", LongType(), True)])

# Need this dict approach with pyspark, different to Scala.
rdd = result_df.rdd.zipWithIndex()
rdd1 = rdd.map(
               lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],)
              )

final_result_df = spark.createDataFrame(rdd1, schema)
final_result_df.show(truncate=False)

returns:

 +---------------------------+-----+
 |ARRAY_COLS                 |index|
 +---------------------------+-----+
 |[colA_1, colB_11, colC_111]|0    |
 |[colA_2, colB_22, colC_222]|1    |
 +---------------------------+-----+

第二部分是旧的 zipWithIndex 和 pyspark,如果你需要 0,1,.. 与 Scala 相比很痛苦。

通常在 Scala 中更容易解决。

不确定性能,不是 foldLeft,很有趣。我觉得其实还可以。