python/databricks 中的特征存储区功能

featurestore functionality in python/databricks

我在 pyspark 中有一个函数,如下所示,其中每个新列都是一个新功能。例如 journey_email_been_sent_flagjourney_opened_flagjourney_clicked_flagjourney_transaction_flag 是新功能。我想创建一个函数,如果用户想要获得上述功能之一,我应该能够检索结果 user.The 这背后的基本思想是中央存储库中功能的可重用性。有没有办法为每个功能动态实现此目的。

journey_level_revenue_email_open_click = spark.read.parquet(journey_level_revenue_path)
analysis_start_date = "2019-05-06"
def df_ptf_overall(df : DataFrame,startdate : StringType):
  ptf_overall1 = df \
        .filter(F.col('journey_start_date') >= f"{startdate}") \
        .select('bpid',
                'journeyinstanceid',
                'journeyid',
                'journey_start_date',
                'measurement_group',
                'country',
                'email_14days',
                'opened_14days',
                'clicked_14days',
                'testfnemail_14days',
                'testfnopened_14days',
                'testfnclicked_14days',
                'revenue_14days',
                'num_trx_14days',
                'num_items_bought_14days'
                )
  return ptf_overall1

#display(df_ptf_overall(journey_level_revenue_email_open_click,analysis_start_date))

def df_ptf_overall2(df : DataFrame,startdate : StringType):
  ptf_overall2 = df_ptf_overall(df,startdate).filter('measurement_group = "test"') \
    .withColumn('journey_email_been_sent_flag', F.when(F.col('email_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('journey_opened_flag', F.when(F.col('opened_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('journey_clicked_flag', F.when(F.col('clicked_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('journey_transaction_flag', F.when(F.col('revenue_14days') > 0, F.lit(1)).otherwise(F.lit(0)))
  return ptf_overall2

#display(df_ptf_overall2(journey_level_revenue_email_open_click,analysis_start_date))

MLflow 将很快推出正式的特征存储功能,因此请留意。与此同时,为什么不只添加一个具有所需功能的参数(数组或字符串列表),然后在创建 ptf_overall2 之后只添加 select 个参数?你可以做类似 DF.select(df_ptf_overall(df,startdate).columns + output_cols)

feature_columns = ['journey_email_been_sent_flag','journey_opened_flag'] df = df_ptf_overall(journey_level_revenue_email_open_click,analysis_start_date)

def feature_result(df:DataFrame, features : ArrayType): input_columns = df.columns selected_columns = input_columns + feature_columns return df_ptf_overall2(journey_level_revenue_email_open_click,analysis_start_date).select(*selected_columns).show()

feature_result(df,feature_columns)

我可以实现这样的东西来实现它