Pyspark:根据另一个数组列更改数组列中的值
Pyspark: change values in an array column based on another array column
我有以下 pyspark 数据框:
root
|-- tokens: array (nullable = true)
| |-- element: string (containsNull = true)
|-- posTags: array (nullable = true)
| |-- element: string (containsNull = true)
|-- dependencies: array (nullable = true)
| |-- element: string (containsNull = true)
|-- labelledDependencies: array (nullable = true)
| |-- element: string (containsNull = true)
以数据为例
+------------------------------+---------------------------+-----------------------------------+--------------------------------------------+
|tokens |posTags |dependencies |labelledDependencies |
+------------------------------+---------------------------+-----------------------------------+--------------------------------------------+
|[i, try, to, get, my, balance]|[NNP, VB, TO, VB, PRP$, NN]|[try, ROOT, get, try, balance, get]|[nsubj, root, mark, parataxis, appos, nsubj]|
+------------------------------+---------------------------+-----------------------------------+--------------------------------------------+
我想将令牌余额的标记依赖从 nsubj 更改为 dobj。
我的逻辑是这样的:
如果您找到标记的依赖项 nsubj
并且令牌具有 POS
标签 NN
并且令牌依赖于具有 POS
标签 VB
的令牌(获取)然后将 nsubj
更改为 dobj
.
我可以使用以下函数来做到这一点:
def change_things(tokens,posTags,dependencies,labelledDependencies):
for i in range(0,len(labelledDependencies)):
if labelledDependencies[i] == 'nsubj':
if posTags[i] == 'NN':
if posTags[tokens.index(dependencies[i])] == 'VB':
labelledDependencies[i] = 'dobj'
return tokens,posTags,dependencies,labelledDependencies
甚至可能将其注册为 udf。
但是,我的问题是如何在不使用 udf 而只使用 pyspark 内置方法的情况下做到这一点。
您可以使用 Spark 内置的 transform
函数:
import pyspark.sql.functions as F
df2 = df.withColumn(
"labelledDependencies",
F.expr("""transform(
labelledDependencies,
(x, i) -> CASE WHEN x = 'nsubj'
AND posTags[i] = 'NN'
AND posTags[array_position(tokens, dependencies[i]) - 1] = 'VB'
THEN 'dobj'
ELSE x
END
)
""")
)
df2.show(1, False)
#+------------------------------+---------------------------+-----------------------------------+-------------------------------------------+
#|tokens |posTags |dependencies |labelledDependencies |
#+------------------------------+---------------------------+-----------------------------------+-------------------------------------------+
#|[i, try, to, get, my, balance]|[NNP, VB, TO, VB, PRP$, NN]|[try, ROOT, get, try, balance, get]|[nsubj, root, mark, parataxis, appos, dobj]|
#+------------------------------+---------------------------+-----------------------------------+-------------------------------------------+
我有以下 pyspark 数据框:
root
|-- tokens: array (nullable = true)
| |-- element: string (containsNull = true)
|-- posTags: array (nullable = true)
| |-- element: string (containsNull = true)
|-- dependencies: array (nullable = true)
| |-- element: string (containsNull = true)
|-- labelledDependencies: array (nullable = true)
| |-- element: string (containsNull = true)
以数据为例
+------------------------------+---------------------------+-----------------------------------+--------------------------------------------+
|tokens |posTags |dependencies |labelledDependencies |
+------------------------------+---------------------------+-----------------------------------+--------------------------------------------+
|[i, try, to, get, my, balance]|[NNP, VB, TO, VB, PRP$, NN]|[try, ROOT, get, try, balance, get]|[nsubj, root, mark, parataxis, appos, nsubj]|
+------------------------------+---------------------------+-----------------------------------+--------------------------------------------+
我想将令牌余额的标记依赖从 nsubj 更改为 dobj。
我的逻辑是这样的:
如果您找到标记的依赖项 nsubj
并且令牌具有 POS
标签 NN
并且令牌依赖于具有 POS
标签 VB
的令牌(获取)然后将 nsubj
更改为 dobj
.
我可以使用以下函数来做到这一点:
def change_things(tokens,posTags,dependencies,labelledDependencies):
for i in range(0,len(labelledDependencies)):
if labelledDependencies[i] == 'nsubj':
if posTags[i] == 'NN':
if posTags[tokens.index(dependencies[i])] == 'VB':
labelledDependencies[i] = 'dobj'
return tokens,posTags,dependencies,labelledDependencies
甚至可能将其注册为 udf。
但是,我的问题是如何在不使用 udf 而只使用 pyspark 内置方法的情况下做到这一点。
您可以使用 Spark 内置的 transform
函数:
import pyspark.sql.functions as F
df2 = df.withColumn(
"labelledDependencies",
F.expr("""transform(
labelledDependencies,
(x, i) -> CASE WHEN x = 'nsubj'
AND posTags[i] = 'NN'
AND posTags[array_position(tokens, dependencies[i]) - 1] = 'VB'
THEN 'dobj'
ELSE x
END
)
""")
)
df2.show(1, False)
#+------------------------------+---------------------------+-----------------------------------+-------------------------------------------+
#|tokens |posTags |dependencies |labelledDependencies |
#+------------------------------+---------------------------+-----------------------------------+-------------------------------------------+
#|[i, try, to, get, my, balance]|[NNP, VB, TO, VB, PRP$, NN]|[try, ROOT, get, try, balance, get]|[nsubj, root, mark, parataxis, appos, dobj]|
#+------------------------------+---------------------------+-----------------------------------+-------------------------------------------+