如何使用 Pyspark 在 OOP 中重写代码
How to rewrite your code in OOP using Pyspark
我有一个简单的数据框
sdf0 = spark.createDataFrame(
[
("eng", "BlackBerry sells legacy patents of mobile devices"),
("eng", "Amazon to shut down publishing house Westland Books"),
],
["lang", "title"],
)
lang
title
eng
BlackBerry sells legacy patents of mobile devices
eng
Amazon to shut down publishing house Westland Books
我还有一个代码可以从文本中提取过滤后的单词
# to lower
sdf = sdf0.withColumn("low_title", F.lower(F.col("title")))
# tokenize
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
sdf1 = tokenizer.transform(sdf)
# filter stopwords
import stopwordsiso
available_lang = {"eng": "en"}
stopwords_iso = {}
for lang in available_langs:
stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
sdf_filtered = reduce(
lambda a, b: a.unionAll(b),
(
StopWordsRemover(
inputCol="tokens", outputCol="filtered_words", stopWords=value
).transform(sdf1.where(F.col("lang") == key))
for key, value in stopwords.items()
),
)
# explode
sdf_exp = (
sdf_filtered.withColumn("filtered_word", F.explode("filtered_words"))
.select("lang", "filtered_word")
.withColumn(
"filtered_word",
F.regexp_replace(
"filtered_word", r'[!"«»#$%&\'()*+,-./:;<=>?@[\]^_`{|}~–—0-9]', ""
),
)
.filter(F.length(F.col("filtered_word")) > 0)
)
输出:
+----+-------------+
|lang|filtered_word|
+----+-------------+
| eng| blackberry|
| eng| sells|
| eng| legacy|
| eng| patents|
| eng| mobile|
| eng| devices|
| eng| amazon|
| eng| shut|
| eng| publishing|
| eng| house|
| eng| westland|
| eng| books|
+----+-------------+
我试图在 class 中重写它,但我一直收到列不存在的错误。如何绑定以前的数据框并将它们提供给函数?或者有没有更简单的方法来不写很多函数。谢谢。
import pyspark.sql.functions as F
import stopwordsiso
class Filter_words:
def __init__(self, sdf):
self.sdf = sdf
def lower(self):
self.sdf = self.sdf.withColumn("low_title", F.lower(F.col("title")))
def tokenize(self):
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
self.sdf = tokenizer.transform(sdf)
def stop_words(self):
available_lang = {"eng": "en"}
stopwords_iso = {}
for lang in available_langs:
stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
self.sdf = reduce(
lambda a, b: a.unionAll(b),
(
StopWordsRemover(
inputCol="tokens", outputCol="filtered_words", stopWords=value
).transform(sdf.where(F.col("lang") == key))
for key, value in stopwords.items()
),
)
def explode_column(self):
self.sdf = (
self.sdf.withColumn("filtered_word", F.explode("tokens"))
.select("lang", "filtered_word")
.withColumn(
"filtered_word",
F.regexp_replace(
"filtered_word", r'[!"«»#$%&\'()*+,-./:;<=>?@[\]^_`{|}~–—0-9]', ""
),
)
.filter(F.length(F.col("filtered_word")) > 0)
)
sdf = Filter_words(sdf0)
找到了 - 您的代码中至少有 3 个错误。在新会话中使用 IDE,您将看到所有错误
tokenize
def tokenize(self):
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
self.sdf = tokenizer.transform(sdf)
# should be
def tokenize(self):
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
self.sdf = tokenizer.transform(self.sdf) # self missing
stop_words
def stop_words(self):
available_lang = {"eng": "en"}
stopwords_iso = {}
for lang in available_langs:
stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
self.sdf = reduce(
lambda a, b: a.unionAll(b),
(
StopWordsRemover(
inputCol="tokens", outputCol="filtered_words", stopWords=value
).transform(sdf.where(F.col("lang") == key))
for key, value in stopwords.items()
),
)
# should be
def stop_words(self):
available_langs = {"eng": "en"} # final -s missing
stopwords_iso = {}
for lang in available_langs:
stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
self.sdf = reduce(
lambda a, b: a.unionAll(b),
(
StopWordsRemover(
inputCol="tokens", outputCol="filtered_words", stopWords=value
).transform(self.sdf.where(F.col("lang") == key)) # self missing
for key, value in stopwords.items()
),
)
我有一个简单的数据框
sdf0 = spark.createDataFrame(
[
("eng", "BlackBerry sells legacy patents of mobile devices"),
("eng", "Amazon to shut down publishing house Westland Books"),
],
["lang", "title"],
)
lang | title |
---|---|
eng | BlackBerry sells legacy patents of mobile devices |
eng | Amazon to shut down publishing house Westland Books |
我还有一个代码可以从文本中提取过滤后的单词
# to lower
sdf = sdf0.withColumn("low_title", F.lower(F.col("title")))
# tokenize
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
sdf1 = tokenizer.transform(sdf)
# filter stopwords
import stopwordsiso
available_lang = {"eng": "en"}
stopwords_iso = {}
for lang in available_langs:
stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
sdf_filtered = reduce(
lambda a, b: a.unionAll(b),
(
StopWordsRemover(
inputCol="tokens", outputCol="filtered_words", stopWords=value
).transform(sdf1.where(F.col("lang") == key))
for key, value in stopwords.items()
),
)
# explode
sdf_exp = (
sdf_filtered.withColumn("filtered_word", F.explode("filtered_words"))
.select("lang", "filtered_word")
.withColumn(
"filtered_word",
F.regexp_replace(
"filtered_word", r'[!"«»#$%&\'()*+,-./:;<=>?@[\]^_`{|}~–—0-9]', ""
),
)
.filter(F.length(F.col("filtered_word")) > 0)
)
输出:
+----+-------------+
|lang|filtered_word|
+----+-------------+
| eng| blackberry|
| eng| sells|
| eng| legacy|
| eng| patents|
| eng| mobile|
| eng| devices|
| eng| amazon|
| eng| shut|
| eng| publishing|
| eng| house|
| eng| westland|
| eng| books|
+----+-------------+
我试图在 class 中重写它,但我一直收到列不存在的错误。如何绑定以前的数据框并将它们提供给函数?或者有没有更简单的方法来不写很多函数。谢谢。
import pyspark.sql.functions as F
import stopwordsiso
class Filter_words:
def __init__(self, sdf):
self.sdf = sdf
def lower(self):
self.sdf = self.sdf.withColumn("low_title", F.lower(F.col("title")))
def tokenize(self):
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
self.sdf = tokenizer.transform(sdf)
def stop_words(self):
available_lang = {"eng": "en"}
stopwords_iso = {}
for lang in available_langs:
stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
self.sdf = reduce(
lambda a, b: a.unionAll(b),
(
StopWordsRemover(
inputCol="tokens", outputCol="filtered_words", stopWords=value
).transform(sdf.where(F.col("lang") == key))
for key, value in stopwords.items()
),
)
def explode_column(self):
self.sdf = (
self.sdf.withColumn("filtered_word", F.explode("tokens"))
.select("lang", "filtered_word")
.withColumn(
"filtered_word",
F.regexp_replace(
"filtered_word", r'[!"«»#$%&\'()*+,-./:;<=>?@[\]^_`{|}~–—0-9]', ""
),
)
.filter(F.length(F.col("filtered_word")) > 0)
)
sdf = Filter_words(sdf0)
找到了 - 您的代码中至少有 3 个错误。在新会话中使用 IDE,您将看到所有错误
tokenize
def tokenize(self):
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
self.sdf = tokenizer.transform(sdf)
# should be
def tokenize(self):
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
self.sdf = tokenizer.transform(self.sdf) # self missing
stop_words
def stop_words(self):
available_lang = {"eng": "en"}
stopwords_iso = {}
for lang in available_langs:
stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
self.sdf = reduce(
lambda a, b: a.unionAll(b),
(
StopWordsRemover(
inputCol="tokens", outputCol="filtered_words", stopWords=value
).transform(sdf.where(F.col("lang") == key))
for key, value in stopwords.items()
),
)
# should be
def stop_words(self):
available_langs = {"eng": "en"} # final -s missing
stopwords_iso = {}
for lang in available_langs:
stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
self.sdf = reduce(
lambda a, b: a.unionAll(b),
(
StopWordsRemover(
inputCol="tokens", outputCol="filtered_words", stopWords=value
).transform(self.sdf.where(F.col("lang") == key)) # self missing
for key, value in stopwords.items()
),
)