基于在 DataBrick 笔记本顶部提取小部件值动态 retrieve/filter Spark 框架的最佳 PySpark 实践是什么?

What is the best PySpark practice to dynamically retrieve/filter the Spark frame based on extracting widget values on top of notebook in DataBrick?

假设我有以下 Spark 数据框所谓的 df:

                        <------Time-resolution-------->
+------------+----------+---------+---------+---------+
|    Name    |   date   |  00-24  |  00-12  |  12-24  |
+------------+----------+---------+---------+---------+
|     X1     |2020-10-20|   137   |   68    |   69    |
|     X2     |2020-10-22|   132   |   66    |   66    |
|     X3     |2020-10-24|   132   |   64    |   68    |
|     X4     |2020-10-25|   587   |   292   |   295   |
|     X5     |2020-10-29|   134   |   67    |   67    |
+------------+----------+---------+---------+---------+

我想使用 PySpark 在 DataBricks 介质中的笔记本顶部创建 4 个小部件,形式为 dbutils.widgets.dropdown(),可用数据如下:

我基于此 & that 尝试了以下内容:

我可以针对第一项和第二项做到这一点,如下所示:

dbutils.widgets.removeAll()

# compute the list of all dates from maximum date available till today
max_date = df.select(F.max('date')).first()['max(date)']
min_date = df.select(F.min('date')).first()['min(date)']
print(min_date)
print(max_date)

dbutils.widgets.dropdown(name = "DATE_FROM", defaultValue = min_date , choices = ['date'])
dbutils.widgets.dropdown(name = "DATE_TO", defaultValue = max_date, choices = ['date'])
#dbutils.widgets.text(name = "DATE_FROM", defaultValue = min_date")
#dbutils.widgets.text(name = "DATE_TO",   defaultValue = max_date)

对于第 3 项,我只是有一个愚蠢的想法:

channel = ['00-24', '00-12', '12-24']
dbutils.widgets.dropdown(name = "Time_Resolution_Of_Interest", defaultValue = "00-24" , choices = [str(x) for x in channel] + ["None"])

对于最后一项,我想列出感兴趣的名称,但我无法映射 String 并像 Scala 版本一样传递它

#Get interested Time resolution from widget
dropdownColumn = dbutils.widgets.get("Time_Resolution_Of_Interest")
# compute the list 5 top names in interested time resolution 
max_Top_Name = df.select(F.max(dropdownColumn)).first()[dropdownColumn]

NUM_OF_NAMES_FOR_DROPDOWN = 5

#Scala version works
#val Name_list = df.select("Name").take(NUM_OF_NAMES_FOR_DROPDOWN).map(i=>i.getAs[String]("Name"))
#dbutils.widgets.dropdown("Name", "X1", Name_list.toSeq , "Username Of Interes")

#PySpark version doesn't work
Name_list = df.select("Name").take(NUM_OF_NAMES_FOR_DROPDOWN).rdd.flatMap(lambda x: x).collect()
dbutils.widgets.dropdown(name = "Name", defaultValue = max_Top_Name , choices = [str(x) for x in Name_list] + ["None"])

最后我想过滤特定名称的记录和随时间选择的时间分辨率并更新框架并根据此 如下:

selected_widgets = ['DATE_FROM', 'DATE_TO', 'Time_Resolution_Of_Interest', 'Name_Of_Interest']
myList = getArgument(selected_widgets).split(",")
display(df.filter(df.isin(myList)))

我希望通过小部件值名称:X1 和时间分辨率:00-24 在特定时间 date 从 [=25] 达到以下 table =] 直到 2020-11-20:

+------------+----------+---------+
|    Name    |   date   |  00-24  | 
+------------+----------+---------+
|     X1     |2020-10-20|   137   |  
|     X1     |2020-10-21|   111   | 
|     X1     |2020-10-22|   99    | 
|     X1     |2020-10-23|   123   | 
|     X1     |2020-10-24|   101   |  
|    ...     |    ...   |   ...   |  
+------------+----------+---------+

您可以做的是首先像您正在做的那样构建小部件,然后从小部件中获取各个值并过滤它们以获得最终结果。请参阅下面的示例代码,这可能与您的要求不匹配 1-1,但应该会指导您获得所需的内容。

创建日期小部件:

from pyspark.sql.functions import min, max

dbutils.widgets.removeAll()

# compute the list of all dates from maximum date available till today
date = [date[0] for date in data.select("date").collect()]
max_min_date = data.select(max('date'),min('date')).first()
min_date = max_min_date['min(date)']
max_date = max_min_date['max(date)']
print(date)
print(min_date)
print(max_date)

dbutils.widgets.dropdown(name = "DATE_FROM", defaultValue = min_date , choices = date)
dbutils.widgets.dropdown(name = "DATE_TO", defaultValue = max_date, choices = date)

使用模式创建时间分辨率小部件,这将允许您构建时间列的动态列表:

channel = [f.name for f in data.schema.fields if f.name not in ['name', 'date']]
print(channel)
dbutils.widgets.dropdown(name = "Time_Resolution_Of_Interest", defaultValue = "00-24" , choices = [str(x) for x in channel] + ["None"])

创建名称小部件:

from pyspark.sql.functions import col
dropdownColumn = dbutils.widgets.get("Time_Resolution_Of_Interest")
NUM_OF_NAMES_FOR_DROPDOWN = 5

#sort by selected time column desc and take 5 rows
name_limit = [name[0] for name in 
data.select("Name").orderBy(col(dropdownColumn), ascending=False).take(NUM_OF_NAMES_FOR_DROPDOWN)]
dbutils.widgets.dropdown(name = "Name", defaultValue = 'X1' , choices = [str(x) for x in name_limit] + ["None"])

最后,根据小部件值过滤数据:

date_from_val = dbutils.widgets.get("DATE_FROM")
date_to_val = dbutils.widgets.get("DATE_TO")
time_val = dbutils.widgets.get("Time_Resolution_Of_Interest")
name_val = dbutils.widgets.get("Name")

result = data.select("name", time_val).where(f"name = '{name_val}' and date between '{date_from_val}' and  '{date_to_val}'")

display(result)