用于创建要从 py-spark 中的业务规则执行的可执行文件的框架

Framework for creating executable to be executed from the business rules in py-spark

我有一个具有以下样本值的数据框 df。

from pyspark.sql.types import DateType, LongType, StringType, StructType, StructField,BooleanType
import os
import pyspark.sql.functions as F
import datetime
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType,IntegerType,ArrayType
from pyspark.sql import Row

l = [('test',1,0,1,0),('prod',0,1,0,1),('local',1,0,1,0)]
rdd = sc.parallelize(l)
sdf = rdd.map(lambda x: Row(col1=x[0], col2=int(x[1]),col3=int(x[2]),col4=int(x[3]),col5=int(x[4])))
df = sqlContext.createDataFrame(sdf)

-----+----+----+----+----+
| col1|col2|col3|col4|col5|
+-----+----+----+----+----+
| test|   1|   0|   1|   0|
| prod|   0|   1|   0|   1|
|local|   1|   0|   1|   0|
+-----+----+----+----+----+

还有一些业务规则如下。截至目前,这已作为元数据保存在字典中。(但是规则元数据也可以保存为:agg_level、agg_function、transformation、source、source_column)

 features = {
  "col6": F.when(F.col('col2') > 0,F.lit(1)).otherwise(F.lit(0)),
  "col7": F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0)),
  "col8": F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0)),
  "col9": F.when(F.col('col5') > 0, F.lit(1)).otherwise(F.lit(0))
}

我想创建一个函数 df_extract() ,它动态生成 executable 代码,如下所示。这应该 return 下面的查询被执行(不是作为数据框)

df1 = df_extract(df,col6,col7,col8,col9)
df1 = **df.filter('col1 = "test"') \
    .withColumn('col6', F.when(F.col('col2') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('col7', F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('col8', F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('col9', F.when(F.col('col5') > 0, F.lit(1)).otherwise(F.lit(0)))** 

使用三个特征进行调用时,returned 查询等中应仅存在三个特征。

df1 = df_extract(df,col6,col7,col8) 
df1 = **df.filter('col1 = "test"') \
    .withColumn('col6', F.when(F.col('col2') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('col7', F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('col8', F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0)))**

终于没有特征了,所有特征都应该在 expression/query.

df1 = df_extract(df)
df1 = **df.filter('col1 = "test"') \
    .withColumn('col6', F.when(F.col('col2') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('col7', F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('col8', F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('col9', F.when(F.col('col5') > 0, F.lit(1)).otherwise(F.lit(0)))**

无论如何这可能吗,至少通过在 pyspark 中创建 sql table。 N 个这样的转换规则将与每个数据帧相关联,并且该函数应该能够 return 动态定义。

想个办法都烦了。

我想到的一种解决方案是使用规则作为案例条件。

   journey_features = {
      "Rules":{
      "col6": "case when col2 > 0 then 1 else 0 end as col6",
      "col7": "case when col3 > 0 then 1 else 0 end as col7",
      "col8": "case when col4 > 0 then 1 else 0 end as col8",
      "col9": "case when col5 > 0 then 1 else 0 end as col9"
      },
      "filter":"col1 == 'test'"
      }

extract_feature() 函数创建如下,以使用规则作为表达式。

    def extract_feature(df : DataFrame,*featurenames):
  retrieved_features = ""
  for featurename in featurenames:
    if featurename  in journey_features.get('Rules'):
      retrieved_features += "'" + str(journey_features.get('Rules')[featurename]) +"'" + ","
  retrieved_features = retrieved_features.rstrip(',')
  if journey_features['filter']:
    filter_feature = ".filter({df}.".format(df=df) + str(journey_features['filter']) + ")"
  else:
    filte_feature = ""
  return "{0}{1}.selectExpr({2})".format(df,filter_feature,retrieved_features)

并传递 df 并将 , 特征传递给函数。

extract_feature('df','col6','col7')

结果是

Out[139]: "df.filter(df.measurement_group == 'test').selectExpr('case when col2 > 0 then 1 else 0 end as col6','case when col3 > 0 then 1 else 0 end as col7')"

可以使用 eval 函数分配给数据框

df1 = eval(extract_feature('df','col6','col7'))