python/databricks 中的特征存储区功能
featurestore functionality in python/databricks
我在 pyspark 中有一个函数,如下所示,其中每个新列都是一个新功能。例如 journey_email_been_sent_flag
、journey_opened_flag
、journey_clicked_flag
和 journey_transaction_flag
是新功能。我想创建一个函数,如果用户想要获得上述功能之一,我应该能够检索结果 user.The 这背后的基本思想是中央存储库中功能的可重用性。有没有办法为每个功能动态实现此目的。
journey_level_revenue_email_open_click = spark.read.parquet(journey_level_revenue_path)
analysis_start_date = "2019-05-06"
def df_ptf_overall(df : DataFrame,startdate : StringType):
ptf_overall1 = df \
.filter(F.col('journey_start_date') >= f"{startdate}") \
.select('bpid',
'journeyinstanceid',
'journeyid',
'journey_start_date',
'measurement_group',
'country',
'email_14days',
'opened_14days',
'clicked_14days',
'testfnemail_14days',
'testfnopened_14days',
'testfnclicked_14days',
'revenue_14days',
'num_trx_14days',
'num_items_bought_14days'
)
return ptf_overall1
#display(df_ptf_overall(journey_level_revenue_email_open_click,analysis_start_date))
def df_ptf_overall2(df : DataFrame,startdate : StringType):
ptf_overall2 = df_ptf_overall(df,startdate).filter('measurement_group = "test"') \
.withColumn('journey_email_been_sent_flag', F.when(F.col('email_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('journey_opened_flag', F.when(F.col('opened_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('journey_clicked_flag', F.when(F.col('clicked_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('journey_transaction_flag', F.when(F.col('revenue_14days') > 0, F.lit(1)).otherwise(F.lit(0)))
return ptf_overall2
#display(df_ptf_overall2(journey_level_revenue_email_open_click,analysis_start_date))
MLflow 将很快推出正式的特征存储功能,因此请留意。与此同时,为什么不只添加一个具有所需功能的参数(数组或字符串列表),然后在创建 ptf_overall2
之后只添加 select 个参数?你可以做类似 DF.select(df_ptf_overall(df,startdate).columns + output_cols)
feature_columns = ['journey_email_been_sent_flag','journey_opened_flag']
df = df_ptf_overall(journey_level_revenue_email_open_click,analysis_start_date)
def feature_result(df:DataFrame, features : ArrayType):
input_columns = df.columns
selected_columns = input_columns + feature_columns
return df_ptf_overall2(journey_level_revenue_email_open_click,analysis_start_date).select(*selected_columns).show()
feature_result(df,feature_columns)
我可以实现这样的东西来实现它
我在 pyspark 中有一个函数,如下所示,其中每个新列都是一个新功能。例如 journey_email_been_sent_flag
、journey_opened_flag
、journey_clicked_flag
和 journey_transaction_flag
是新功能。我想创建一个函数,如果用户想要获得上述功能之一,我应该能够检索结果 user.The 这背后的基本思想是中央存储库中功能的可重用性。有没有办法为每个功能动态实现此目的。
journey_level_revenue_email_open_click = spark.read.parquet(journey_level_revenue_path)
analysis_start_date = "2019-05-06"
def df_ptf_overall(df : DataFrame,startdate : StringType):
ptf_overall1 = df \
.filter(F.col('journey_start_date') >= f"{startdate}") \
.select('bpid',
'journeyinstanceid',
'journeyid',
'journey_start_date',
'measurement_group',
'country',
'email_14days',
'opened_14days',
'clicked_14days',
'testfnemail_14days',
'testfnopened_14days',
'testfnclicked_14days',
'revenue_14days',
'num_trx_14days',
'num_items_bought_14days'
)
return ptf_overall1
#display(df_ptf_overall(journey_level_revenue_email_open_click,analysis_start_date))
def df_ptf_overall2(df : DataFrame,startdate : StringType):
ptf_overall2 = df_ptf_overall(df,startdate).filter('measurement_group = "test"') \
.withColumn('journey_email_been_sent_flag', F.when(F.col('email_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('journey_opened_flag', F.when(F.col('opened_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('journey_clicked_flag', F.when(F.col('clicked_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('journey_transaction_flag', F.when(F.col('revenue_14days') > 0, F.lit(1)).otherwise(F.lit(0)))
return ptf_overall2
#display(df_ptf_overall2(journey_level_revenue_email_open_click,analysis_start_date))
MLflow 将很快推出正式的特征存储功能,因此请留意。与此同时,为什么不只添加一个具有所需功能的参数(数组或字符串列表),然后在创建 ptf_overall2
之后只添加 select 个参数?你可以做类似 DF.select(df_ptf_overall(df,startdate).columns + output_cols)
feature_columns = ['journey_email_been_sent_flag','journey_opened_flag'] df = df_ptf_overall(journey_level_revenue_email_open_click,analysis_start_date)
def feature_result(df:DataFrame, features : ArrayType): input_columns = df.columns selected_columns = input_columns + feature_columns return df_ptf_overall2(journey_level_revenue_email_open_click,analysis_start_date).select(*selected_columns).show()
feature_result(df,feature_columns)
我可以实现这样的东西来实现它