如何将条件值从多个外部列表或数组获取到新列中

How to get conditional values into new column from several external lists or arrays

我有以下数据框:

d = [ {'id':  3, 'ratio': 1.3 ,'vol1': 100 }, 
      {'id':  5, 'ratio': 0.3 ,'vol1': 200 },
      {'id':  1, 'ratio': 1.1 ,'vol1': 300 },
      {'id':  8, 'ratio': 0.8 ,'vol1': 400 },
      {'id':  2, 'ratio': 2.0 ,'vol1': 500 },
      {'id':  4, 'ratio': 0.0 ,'vol1': 600 }
    ] 
data = spark.createDataFrame(d)

我必须为此创建一个附加列 new_col_cond,它取决于多个外部 lists/arrays 的值(我也尝试过使用字典),例如:

q1 = [10,20,30,40,50,60,70,80,90]
q1_n = np.array(q1).reshape(-1)     #numpy array from above
q2 = [1,2,3,4,5,6,7,8,9]
q2_n = np.array(q2).reshape(-1)

新列取决于ratio的值,并根据id作为索引从任一数组中选择。我试过:

data = data.withColumn('new_col_cond', when(col('ratio')<1, q1[col('id')])
                                      .when(col('ratio')>1, q2[col('id')])
                      ) #also with numpy arrays.

出现错误。我假设错误的主要来源是使用列作为数组的索引,但不确定如何将索引插入数组。鉴于列的条件性质,我没有尝试 join(数据是数百万行,列表是数千行)。

由于数据集的大小,我正在远离 Pandas 和 udfs。生成的数据框应如下所示:

+---+-----+----+------------+
| id|ratio|vol1|new_col_cond|
+---+-----+----+------------+
|  3|  1.3| 100| 4          |
|  5|  0.3| 200| 60         |
|  1|  1.1| 300| 2          |
|  8|  0.8| 400| 90         |
|  2|  2.0| 500| 3          |
|  4|  0.0| 600| 50         |
+---+-----+----+------------+

感谢任何解决此问题的帮助。

在创建数据框之前将'new_col_cond'添加到您的词典中会更容易。

d = [{'id':  3, 'ratio': 1.3, 'vol1': 100},
     {'id':  5, 'ratio': 0.3, 'vol1': 200},
     {'id':  1, 'ratio': 1.1, 'vol1': 300},
     {'id':  8, 'ratio': 0.8, 'vol1': 400},
     {'id':  2, 'ratio': 2.0, 'vol1': 500},
     {'id':  4, 'ratio': 0.0, 'vol1': 600}
     ]
q1 = [10, 20, 30, 40, 50, 60, 70, 80, 90]
q2 = [1, 2, 3, 4, 5, 6, 7, 8, 9]

for d_ in d:
    d_['new_col_cond'] = q1[d_['id']] if d_['ratio'] < 1 else q2[d_['id']]

df = spark.createDataFrame(d)

注:

虽然这适用于您显示的数据,但我不确定它是否稳健。如果 'id' 键的值 >8,这将失败

从 numpy 数组创建 ArrayType 列表达式并在您的条件中使用它们,如下所示:

from pyspark.sql import functions as F

q1_n = F.array(*[F.lit(int(x)) for x in q1_n])
q2_n = F.array(*[F.lit(int(x)) for x in q2_n])

result = data.withColumn(
    'new_col_cond',
    F.when(F.col('ratio') < 1, q1_n[F.col('id')])
        .when(F.col('ratio') > 1, q2_n[F.col('id')])
)

result.show()
#+---+-----+----+------------+
#| id|ratio|vol1|new_col_cond|
#+---+-----+----+------------+
#|  3|  1.3| 100|           4|
#|  5|  0.3| 200|          60|
#|  1|  1.1| 300|           2|
#|  8|  0.8| 400|          90|
#|  2|  2.0| 500|           3|
#|  4|  0.0| 600|          50|
#+---+-----+----+------------+