如何基于其他列使用 Python 在 Spark 中创建新列?
How create new column in Spark using Python, based on other column?
我的数据库包含一列字符串。我将根据其他列的部分字符串创建一个新列。例如:
"content" "other column"
The father has two dogs father
One cat stay at home of my mother mother
etc. etc.
我想用我感兴趣的词创建一个数组。例如:
人=[母亲、父亲等]
然后,我迭代列“content”并提取要插入到新列中的词:
def extract_people(df):
column=[]
people=[mother,father,etc.]
for row in df.select("content").collect():
for word in people:
if str(row).find(word):
column.append(word)
break
return pd.Series(column)
f_pyspark = df_pyspark.withColumn('people', extract_people(df_pyspark))
这段代码不起作用,在 collect() 上给我这个错误:
22/01/26 11:34:04 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 36)
java.lang.OutOfMemoryError: Java heap space
可能是因为我的文件太大了,有1500万行。
如何在不同模式下创建新列?
以下面的数据帧为例
+---------------------------------+
|content |
+---------------------------------+
|Thefatherhas two dogs |
|The fatherhas two dogs |
|Thefather has two dogs |
|Thefatherhastwodogs |
|One cat stay at home of my mother|
|One cat stay at home of mymother |
|Onecatstayathomeofmymother |
|etc. |
|my feet smell |
+---------------------------------+
您可以执行以下操作
from pyspark.sql import functions
arr = ["father", "mother", "etc."]
expression = (
"CASE " +
"".join(["WHEN content LIKE '%{}%' THEN '{}' ".format(val, val) for val in arr]) +
"ELSE 'None' END")
df = df.withColumn("other_column", functions.expr(expression))
df.show()
+---------------------------------+------------+
|content |other_column|
+---------------------------------+------------+
|Thefatherhas two dogs |father |
|The fatherhas two dogs |father |
|Thefather has two dogs |father |
|Thefatherhastwodogs |father |
|One cat stay at home of my mother|mother |
|One cat stay at home of mymother |mother |
|Onecatstayathomeofmymother |mother |
|etc. |etc. |
|my feet smell |None |
+---------------------------------+------------+
我的数据库包含一列字符串。我将根据其他列的部分字符串创建一个新列。例如:
"content" "other column"
The father has two dogs father
One cat stay at home of my mother mother
etc. etc.
我想用我感兴趣的词创建一个数组。例如: 人=[母亲、父亲等]
然后,我迭代列“content”并提取要插入到新列中的词:
def extract_people(df):
column=[]
people=[mother,father,etc.]
for row in df.select("content").collect():
for word in people:
if str(row).find(word):
column.append(word)
break
return pd.Series(column)
f_pyspark = df_pyspark.withColumn('people', extract_people(df_pyspark))
这段代码不起作用,在 collect() 上给我这个错误:
22/01/26 11:34:04 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 36)
java.lang.OutOfMemoryError: Java heap space
可能是因为我的文件太大了,有1500万行。 如何在不同模式下创建新列?
以下面的数据帧为例
+---------------------------------+
|content |
+---------------------------------+
|Thefatherhas two dogs |
|The fatherhas two dogs |
|Thefather has two dogs |
|Thefatherhastwodogs |
|One cat stay at home of my mother|
|One cat stay at home of mymother |
|Onecatstayathomeofmymother |
|etc. |
|my feet smell |
+---------------------------------+
您可以执行以下操作
from pyspark.sql import functions
arr = ["father", "mother", "etc."]
expression = (
"CASE " +
"".join(["WHEN content LIKE '%{}%' THEN '{}' ".format(val, val) for val in arr]) +
"ELSE 'None' END")
df = df.withColumn("other_column", functions.expr(expression))
df.show()
+---------------------------------+------------+
|content |other_column|
+---------------------------------+------------+
|Thefatherhas two dogs |father |
|The fatherhas two dogs |father |
|Thefather has two dogs |father |
|Thefatherhastwodogs |father |
|One cat stay at home of my mother|mother |
|One cat stay at home of mymother |mother |
|Onecatstayathomeofmymother |mother |
|etc. |etc. |
|my feet smell |None |
+---------------------------------+------------+