如何在不使用 .toPandas() hack 的情况下提取 PySpark 中对长度敏感的特征?

How can extract features sensitive to length in PySpark without using .toPandas() hack?

我是 PySpark 的新手,我想将 特征提取 (FE) pythonic 部分脚本翻译成 PySpark。首先,我有 Spark 数据框所谓的 sdf 包括 2 列 A 和 B:

示例如下:

data A B
https://example1.org/path/to/file?param=42#fragment path/to/file param=42#fragment
https://example2.org/path/to/file path/to/file NaN

现在我想应用一些特征工程并提取特征并将结果与​​列 B 中的 sdf 连接起来。到目前为止,我可以使用 pythonic 脚本来完成它:

#================================> Type <==========================================
def getType(input_value):
  if pd.isna(input_value):
    return "-"
    
  type_ = "-"

  if input_value.isdigit():                                                 # Only numeric
    type_ = "Int"
  elif bool(re.match(r"^[a-zA-Z0-9_]+$", input_value)):                     # Consists of one or more of a-zA-Z, 0-9, underscore , and Chinese
    type_ = "String"
  elif bool(re.match(r"^[\d+,\s]+$", input_value)):                         # Only comma exists as separator "^[\d+,\s]+$"
    type_ = "Array"

  else:  
    existing_separators = re.findall(r"([\+\;\,\:\=\|\/\#\'\"\t\r\n\s])+", input_value)
    # There are one or more separators
    # when there is only one separator it is not comma (!= "^[\d+,\s]+$")
    if len(existing_separators) > 1 or (len(existing_separators) == 1 and existing_separators[0] != ","):
      type_ = "Sentence"                                                

  return type_


#================================> Length <==========================================
#Number of charactesrs in parameter value
getLength = lambda input_text: 0 if pd.isna(input_text) else len(input_text)

#================================> Token number <==========================================

double_separators_regex = re.compile(r"[\<\[\(\{]+[0-9a-zA-Z_\.\-]+[\}\)\]\>]+")
single_separators_regex = re.compile(r"([0-9a-zA-Z_\.\-]+)?[\+\,\;\:\=\|\/\#\'‘’\"“â€\t\r\n\s]+([0-9a-zA-Z_\.\-]+)?")

token_number = lambda input_text: 0 if pd.isna(input_text) else len(double_separators_regex.findall(input_text) + [element for pair in single_separators_regex.findall(input_text) for element in pair if element != ""])

#quick test 
param_example = "url=http://news.csuyst.edu.cn/sem/resource/code/rss/rssfeed.jsp?type=list"
out = double_separators_regex.findall(param_example) + [element for pair in single_separators_regex.findall(param_example) for element in pair if element != ""] 

print(out)        #['url','http','news.csuyst.edu.cn','sem','resource','code','rss','rssfeed.jsp','type','list']
print(len(out))   #9

#===================================> Encoding type <============================================

import base64

def isBase64(input_value):
  try:
    return base64.b64encode(base64.b64decode(input_value)) == input_value
  except Exception as e:
    return False

#================================> Character feature <==========================================
N = 2

n_grams = lambda input_text: 0 if pd.isna(input_text) else len(set([input_text[character_index:character_index+N] for character_index in range(len(input_text)-N+1)]))


#quick test 
n_grams_example = 'zhang1997'  #output = [‘zh’, ‘ha’, ‘an’, ‘ng’, ‘g1’, ‘19’, ‘99’ , ‘97’]
n_grams(n_grams_example)       # 8



#frame the features
features_df = pd.DataFrame()

features_df["Type"] = df.fragment.apply(getType)
features_df["Length"] = df.fragment.apply(getLength)
features_df["Token_number"] = df.fragment.apply(token_number)
features_df["Encoding_type"] = df.fragment.apply(isBase64)
features_df["Character_feature"] = df.fragment.apply(n_grams)

features_df.columns  #Index(['Type', 'Length', 'Token number', 'Encoding type', 'Character feature'], dtype='object')
features_df

问题:什么是翻译 FE 的最佳方法而不 将 Spark 数据帧转换为 Pandas 数据帧 toPandas() 优化管道并处理它 100% spark 形式?

所以我提供了一个 colab notebook 用于快速调试和评论。

预期输出以 Spark 数据帧的形式显示如下:

+--------------------+------------+-----------------+--------+------+-------------+--------------+-----------------+
|data                |A           |B                |Type    |Length|Token_number |Encoding_type |Character_feature|
+--------------------+------------+-----------------+--------+------+-------------+--------------+-----------------+
|https://example1....|path/to/file|param=42#fragment|Sentence|17.0  |3.0          |False         |15.0             |
|https://example2....|path/to/file|Null             |-       |0.0   |0.0          |False         |0.0              |
+--------------------+------------+-----------------+--------+------+-------------+--------------+-----------------+

我在这里为您制作了一个示例代码,它并不完美,但它至少遵循了您的源代码并且应该为您指明下一步的方向。我也对每个 Spark 转换发表了一些评论。希望你觉得它有用

from pyspark.sql import functions as F
from pyspark.sql import functions as f
from pyspark.sql import types as T
from pyspark.sql import Window as W

def count_token(input_text):
    import re
    if input_text is None:
        return 0
    double_separators_regex = re.compile(r"[\<\[\(\{]+[0-9a-zA-Z_\.\-]+[\}\)\]\>]+")
    single_separators_regex = re.compile(r"([0-9a-zA-Z_\.\-]+)?[\+\,\;\:\=\|\/\#\'‘’\"“â€\t\r\n\s]+([0-9a-zA-Z_\.\-]+)?")
    return len(double_separators_regex.findall(input_text) + [element for pair in single_separators_regex.findall(input_text) for element in pair if element != ""])

def n_grams(input_text):
    if input_text is None:
        return 0
    N = 2
    return len(set([input_text[character_index:character_index+N] for character_index in range(len(input_text)-N+1)]))


(df
    .withColumn('test', F.base64(F.unbase64('fragment')))
    .withColumn('Type', F
        .when(F.isnull('fragment'), '-')
        .when(~F.isnull(F.col('fragment').cast('int')), 'Int')
        .when(F.regexp_extract('fragment', '^[a-zA-Z0-9_]+$', 0) == F.col('fragment'), 'String')
        .when(F.regexp_extract('fragment', '^[\d+,\s]+$', 0) == F.col('fragment'), 'Array') # not sure about this regex?
        .otherwise('Sentence') # not sure about this condition either, but you can utilize
                               # `regexp_extract` like above and do any kind of comparision
    )
    .withColumn('Length', F
        .when(F.isnull('fragment'), 0)
        .otherwise(F.length('fragment'))
    )
    .withColumn('Token_number', F.udf(count_token, T.IntegerType())('fragment')) # Spark doesn't provide `findall` alternative, so
                                                                                 # so we have to use UDF here, you can find document here
                                                                                 # http://spark.apache.org/docs/3.0.1/api/python/pyspark.sql.html#pyspark.sql.functions.udf
    .withColumn('Encoding_type', F
        .when(F.isnull('fragment'), False)
        .otherwise(F.base64(F.unbase64(F.col('fragment'))) == F.col('fragment')) # FYI, this is not always correct,
                                                                                 # for example `assert(isBase64('param123') == False)`
    )
    .withColumn('Character_feature', F.udf(n_grams, T.IntegerType())('fragment')) # or you can use more advanced feature from SparkML
                                                                                  # https://spark.apache.org/docs/latest/ml-features.html#n-gram
    .show()
)

# Output
# +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+
# |                data|        path|         fragment|test|    Type|Length|Token_number|Encoding_type|Character_feature|
# +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+
# |https://example1....|path/to/file|param=42#fragment|para|Sentence|    17|           3|        false|               15|
# |https://example2....|path/to/file|             null|null|       -|     0|           0|        false|                0|
# +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+