用于创建要从 py-spark 中的业务规则执行的可执行文件的框架
Framework for creating executable to be executed from the business rules in py-spark
我有一个具有以下样本值的数据框 df。
from pyspark.sql.types import DateType, LongType, StringType, StructType, StructField,BooleanType
import os
import pyspark.sql.functions as F
import datetime
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType,IntegerType,ArrayType
from pyspark.sql import Row
l = [('test',1,0,1,0),('prod',0,1,0,1),('local',1,0,1,0)]
rdd = sc.parallelize(l)
sdf = rdd.map(lambda x: Row(col1=x[0], col2=int(x[1]),col3=int(x[2]),col4=int(x[3]),col5=int(x[4])))
df = sqlContext.createDataFrame(sdf)
-----+----+----+----+----+
| col1|col2|col3|col4|col5|
+-----+----+----+----+----+
| test| 1| 0| 1| 0|
| prod| 0| 1| 0| 1|
|local| 1| 0| 1| 0|
+-----+----+----+----+----+
还有一些业务规则如下。截至目前,这已作为元数据保存在字典中。(但是规则元数据也可以保存为:agg_level、agg_function、transformation、source、source_column)
features = {
"col6": F.when(F.col('col2') > 0,F.lit(1)).otherwise(F.lit(0)),
"col7": F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0)),
"col8": F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0)),
"col9": F.when(F.col('col5') > 0, F.lit(1)).otherwise(F.lit(0))
}
我想创建一个函数 df_extract() ,它动态生成 executable 代码,如下所示。这应该 return 下面的查询被执行(不是作为数据框)
df1 = df_extract(df,col6,col7,col8,col9)
df1 = **df.filter('col1 = "test"') \
.withColumn('col6', F.when(F.col('col2') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col7', F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col8', F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col9', F.when(F.col('col5') > 0, F.lit(1)).otherwise(F.lit(0)))**
使用三个特征进行调用时,returned 查询等中应仅存在三个特征。
df1 = df_extract(df,col6,col7,col8)
df1 = **df.filter('col1 = "test"') \
.withColumn('col6', F.when(F.col('col2') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col7', F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col8', F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0)))**
终于没有特征了,所有特征都应该在 expression/query.
df1 = df_extract(df)
df1 = **df.filter('col1 = "test"') \
.withColumn('col6', F.when(F.col('col2') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col7', F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col8', F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col9', F.when(F.col('col5') > 0, F.lit(1)).otherwise(F.lit(0)))**
无论如何这可能吗,至少通过在 pyspark 中创建 sql table。 N 个这样的转换规则将与每个数据帧相关联,并且该函数应该能够 return 动态定义。
想个办法都烦了。
我想到的一种解决方案是使用规则作为案例条件。
journey_features = {
"Rules":{
"col6": "case when col2 > 0 then 1 else 0 end as col6",
"col7": "case when col3 > 0 then 1 else 0 end as col7",
"col8": "case when col4 > 0 then 1 else 0 end as col8",
"col9": "case when col5 > 0 then 1 else 0 end as col9"
},
"filter":"col1 == 'test'"
}
extract_feature() 函数创建如下,以使用规则作为表达式。
def extract_feature(df : DataFrame,*featurenames):
retrieved_features = ""
for featurename in featurenames:
if featurename in journey_features.get('Rules'):
retrieved_features += "'" + str(journey_features.get('Rules')[featurename]) +"'" + ","
retrieved_features = retrieved_features.rstrip(',')
if journey_features['filter']:
filter_feature = ".filter({df}.".format(df=df) + str(journey_features['filter']) + ")"
else:
filte_feature = ""
return "{0}{1}.selectExpr({2})".format(df,filter_feature,retrieved_features)
并传递 df 并将 , 特征传递给函数。
extract_feature('df','col6','col7')
结果是
Out[139]: "df.filter(df.measurement_group == 'test').selectExpr('case when col2 > 0 then 1 else 0 end as col6','case when col3 > 0 then 1 else 0 end as col7')"
可以使用 eval 函数分配给数据框
df1 = eval(extract_feature('df','col6','col7'))
我有一个具有以下样本值的数据框 df。
from pyspark.sql.types import DateType, LongType, StringType, StructType, StructField,BooleanType
import os
import pyspark.sql.functions as F
import datetime
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType,IntegerType,ArrayType
from pyspark.sql import Row
l = [('test',1,0,1,0),('prod',0,1,0,1),('local',1,0,1,0)]
rdd = sc.parallelize(l)
sdf = rdd.map(lambda x: Row(col1=x[0], col2=int(x[1]),col3=int(x[2]),col4=int(x[3]),col5=int(x[4])))
df = sqlContext.createDataFrame(sdf)
-----+----+----+----+----+
| col1|col2|col3|col4|col5|
+-----+----+----+----+----+
| test| 1| 0| 1| 0|
| prod| 0| 1| 0| 1|
|local| 1| 0| 1| 0|
+-----+----+----+----+----+
还有一些业务规则如下。截至目前,这已作为元数据保存在字典中。(但是规则元数据也可以保存为:agg_level、agg_function、transformation、source、source_column)
features = {
"col6": F.when(F.col('col2') > 0,F.lit(1)).otherwise(F.lit(0)),
"col7": F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0)),
"col8": F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0)),
"col9": F.when(F.col('col5') > 0, F.lit(1)).otherwise(F.lit(0))
}
我想创建一个函数 df_extract() ,它动态生成 executable 代码,如下所示。这应该 return 下面的查询被执行(不是作为数据框)
df1 = df_extract(df,col6,col7,col8,col9)
df1 = **df.filter('col1 = "test"') \
.withColumn('col6', F.when(F.col('col2') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col7', F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col8', F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col9', F.when(F.col('col5') > 0, F.lit(1)).otherwise(F.lit(0)))**
使用三个特征进行调用时,returned 查询等中应仅存在三个特征。
df1 = df_extract(df,col6,col7,col8)
df1 = **df.filter('col1 = "test"') \
.withColumn('col6', F.when(F.col('col2') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col7', F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col8', F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0)))**
终于没有特征了,所有特征都应该在 expression/query.
df1 = df_extract(df)
df1 = **df.filter('col1 = "test"') \
.withColumn('col6', F.when(F.col('col2') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col7', F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col8', F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('col9', F.when(F.col('col5') > 0, F.lit(1)).otherwise(F.lit(0)))**
无论如何这可能吗,至少通过在 pyspark 中创建 sql table。 N 个这样的转换规则将与每个数据帧相关联,并且该函数应该能够 return 动态定义。
想个办法都烦了。
我想到的一种解决方案是使用规则作为案例条件。
journey_features = {
"Rules":{
"col6": "case when col2 > 0 then 1 else 0 end as col6",
"col7": "case when col3 > 0 then 1 else 0 end as col7",
"col8": "case when col4 > 0 then 1 else 0 end as col8",
"col9": "case when col5 > 0 then 1 else 0 end as col9"
},
"filter":"col1 == 'test'"
}
extract_feature() 函数创建如下,以使用规则作为表达式。
def extract_feature(df : DataFrame,*featurenames):
retrieved_features = ""
for featurename in featurenames:
if featurename in journey_features.get('Rules'):
retrieved_features += "'" + str(journey_features.get('Rules')[featurename]) +"'" + ","
retrieved_features = retrieved_features.rstrip(',')
if journey_features['filter']:
filter_feature = ".filter({df}.".format(df=df) + str(journey_features['filter']) + ")"
else:
filte_feature = ""
return "{0}{1}.selectExpr({2})".format(df,filter_feature,retrieved_features)
并传递 df 并将 , 特征传递给函数。
extract_feature('df','col6','col7')
结果是
Out[139]: "df.filter(df.measurement_group == 'test').selectExpr('case when col2 > 0 then 1 else 0 end as col6','case when col3 > 0 then 1 else 0 end as col7')"
可以使用 eval 函数分配给数据框
df1 = eval(extract_feature('df','col6','col7'))