Pyspark RDD 从一行创建 2 行到新的 Dataframe

Pyspark RDD create 2 rows from one row into new Dataframe

我有一个像这样的数据框:

data = [('valorant','web', 'start'),
  ('counter-strike','android', 'start'),
  ('sims','web', 'finished'), 
]

columns = ["game","platform", "type"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
+--------------+--------+--------+
|          game|platform|    type|
+--------------+--------+--------+
|      valorant|     web|   start|
|counter-strike| android|   start|
|          sims|     web|finished|
+--------------+--------+--------+

我想变成:

+--------------+-----+
|          game|count|
+--------------+-----+
|      valorant|    1|
|counter-strike|    1|
|          sims|    1|
|          sims|    1|
+--------------+-----+

所以如果 type == 'finished' 新的 RDD 应该有 2 行值为 1 而不是只有一行值为 1。 有什么方法可以做到这一点而不必映射数据帧 2 次然后合并这些 RDD?

如果我这样做:

def func1(x):
  if x.type == "start":
    return (x.game, 1)
  elif x.type == "finished":
    return ((x.game, 1), (x.game, 1))

rdd2=df.rdd.map(lambda x: func1(x))
df2=rdd2.toDF(['game',  'value'])
df2.show(truncate=False)
+---------------------------+-----+
|game                       |value|
+---------------------------+-----+
|valorant                   |1    |
|counter                    |1    |
|[Ljava.lang.Object;@4b01785|null |
+---------------------------+-----+

它显然不起作用,因为 func1 需要 return 中的一个值。有什么想法吗?

when 表达式 + explode 文字数组:

from pyspark.sql import functions as F

df1 = df.withColumn(
    "count",
    F.explode(
        F.when(F.col("type") == "start", F.array(F.lit(1)))
            .when(F.col("type") == "finished", F.array(F.lit(1), F.lit(1)))
    )
).drop("platform", "type")

df1.show()

#+--------------+-----+
#|          game|count|
#+--------------+-----+
#|      valorant|    1|
#|counter-strike|    1|
#|          sims|    1|
#|          sims|    1|
#+--------------+-----+