如何使用 Pyspark 在 OOP 中重写代码

How to rewrite your code in OOP using Pyspark

我有一个简单的数据框

sdf0 = spark.createDataFrame(
    [
        ("eng", "BlackBerry sells legacy patents of mobile devices"),
        ("eng", "Amazon to shut down publishing house Westland Books"),
    ],
    ["lang", "title"],
)
lang title
eng BlackBerry sells legacy patents of mobile devices
eng Amazon to shut down publishing house Westland Books

我还有一个代码可以从文本中提取过滤后的单词

# to lower
sdf = sdf0.withColumn("low_title", F.lower(F.col("title")))

# tokenize
tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
sdf1 = tokenizer.transform(sdf)

# filter stopwords
import stopwordsiso

available_lang = {"eng": "en"}
stopwords_iso = {}
for lang in available_langs:
    stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
stopwords = {k: list(v) for k, v in stopwords_iso.items()}
sdf_filtered = reduce(
    lambda a, b: a.unionAll(b),
    (
        StopWordsRemover(
            inputCol="tokens", outputCol="filtered_words", stopWords=value
        ).transform(sdf1.where(F.col("lang") == key))
        for key, value in stopwords.items()
    ),
)

# explode
sdf_exp = (
    sdf_filtered.withColumn("filtered_word", F.explode("filtered_words"))
    .select("lang", "filtered_word")
    .withColumn(
        "filtered_word",
        F.regexp_replace(
            "filtered_word", r'[!"«»#$%&\'()*+,-./:;<=>?@[\]^_`{|}~–—0-9]', ""
        ),
    )
    .filter(F.length(F.col("filtered_word")) > 0)
)

输出:

+----+-------------+
|lang|filtered_word|
+----+-------------+
| eng|   blackberry|
| eng|        sells|
| eng|       legacy|
| eng|      patents|
| eng|       mobile|
| eng|      devices|
| eng|       amazon|
| eng|         shut|
| eng|   publishing|
| eng|        house|
| eng|     westland|
| eng|        books|
+----+-------------+

我试图在 class 中重写它,但我一直收到列不存在的错误。如何绑定以前的数据框并将它们提供给函数?或者有没有更简单的方法来不写很多函数。谢谢。

import pyspark.sql.functions as F
import stopwordsiso


class Filter_words:
    def __init__(self, sdf):
        self.sdf = sdf

    def lower(self):
        self.sdf = self.sdf.withColumn("low_title", F.lower(F.col("title")))

    def tokenize(self):
        tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
        self.sdf = tokenizer.transform(sdf)

    def stop_words(self):
        available_lang = {"eng": "en"}
        stopwords_iso = {}
        for lang in available_langs:
            stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
        stopwords = {k: list(v) for k, v in stopwords_iso.items()}
        self.sdf = reduce(
            lambda a, b: a.unionAll(b),
            (
                StopWordsRemover(
                    inputCol="tokens", outputCol="filtered_words", stopWords=value
                ).transform(sdf.where(F.col("lang") == key))
                for key, value in stopwords.items()
            ),
        )

    def explode_column(self):
        self.sdf = (
            self.sdf.withColumn("filtered_word", F.explode("tokens"))
            .select("lang", "filtered_word")
            .withColumn(
                "filtered_word",
                F.regexp_replace(
                    "filtered_word", r'[!"«»#$%&\'()*+,-./:;<=>?@[\]^_`{|}~–—0-9]', ""
                ),
            )
            .filter(F.length(F.col("filtered_word")) > 0)
        )


sdf = Filter_words(sdf0)

找到了 - 您的代码中至少有 3 个错误。在新会话中使用 IDE,您将看到所有错误


  1. tokenize

    def tokenize(self):
        tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
        self.sdf = tokenizer.transform(sdf)

# should be 

    def tokenize(self):
        tokenizer = Tokenizer(inputCol="low_title", outputCol="tokens")
        self.sdf = tokenizer.transform(self.sdf) # self missing
  1. stop_words
    def stop_words(self):
        available_lang = {"eng": "en"}
        stopwords_iso = {}
        for lang in available_langs:
            stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
        stopwords = {k: list(v) for k, v in stopwords_iso.items()}
        self.sdf = reduce(
            lambda a, b: a.unionAll(b),
            (
                StopWordsRemover(
                    inputCol="tokens", outputCol="filtered_words", stopWords=value
                ).transform(sdf.where(F.col("lang") == key))
                for key, value in stopwords.items()
            ),
        )

# should be 

    def stop_words(self):
        available_langs = {"eng": "en"} # final -s missing
        stopwords_iso = {}
        for lang in available_langs:
            stopwords_iso[lang] = stopwordsiso.stopwords(available_langs[lang])
        stopwords = {k: list(v) for k, v in stopwords_iso.items()}
        self.sdf = reduce(
            lambda a, b: a.unionAll(b),
            (
                StopWordsRemover(
                    inputCol="tokens", outputCol="filtered_words", stopWords=value
                ).transform(self.sdf.where(F.col("lang") == key))  # self missing
                for key, value in stopwords.items()
            ),
        )