根据数据框的列值,根据字典的值将字典列表的值分配给数据框
Assign the value of a list of dictionaries depending on the value of a dictionary to a dataframe depending on the column value of the dataframe
我想根据字典的字段 (localSymbol_aux) 将字典列表的值分配给数据框,具体取决于 spark 结构化流中数据框 (localSymbol) 的列值 ( pyspark)
我有以下词典列表
level_1_batch_list_of_dict = [
{
'localSymbol_aux': 'EUR.USD',
'level_1_precio_max_sesion_aux': 4,
'level_1_precio_min_sesion_aux': 0
},
{
'localSymbol_aux': 'USD.JPY',
'level_1_precio_max_sesion_aux': 6,
'level_1_precio_min_sesion_aux': 0
}
]
我有以下数据框:
+------------------------------------------+-----------+-----------------+
|ventana |localSymbol|precio_max_sesion|
+------------------------------------------+-----------+-----------------+
|{2021-05-10 11:33:30, 2021-05-10 11:34:00}|EUR.USD |0 |
|{2021-05-10 11:34:00, 2021-05-10 11:34:30}|EUR.USD |0 |
|{2021-05-10 11:34:30, 2021-05-10 11:35:00}|EUR.USD |0 |
|{2021-05-10 11:35:00, 2021-05-10 11:35:30}|EUR.USD |0 |
|{2021-05-10 11:35:30, 2021-05-10 11:36:00}|EUR.USD |0 |
|{2021-05-10 11:36:00, 2021-05-10 11:36:30}|EUR.USD |0 |
|{2021-05-10 11:33:30, 2021-05-10 11:34:00}|USD.JPY |0 |
|{2021-05-10 11:34:00, 2021-05-10 11:34:30}|USD.JPY |0 |
|{2021-05-10 11:34:30, 2021-05-10 11:35:00}|USD.JPY |0 |
|{2021-05-10 11:35:00, 2021-05-10 11:35:30}|USD.JPY |0 |
|{2021-05-10 11:35:30, 2021-05-10 11:36:00}|USD.JPY |0 |
|{2021-05-10 11:36:00, 2021-05-10 11:36:30}|USD.JPY |0 |
+------------------------------------------+-----------+-----------------+
但我想要以下内容:
+------------------------------------------+-----------+-----------------+
|ventana |localSymbol|precio_max_sesion|
+------------------------------------------+-----------+-----------------+
|{2021-05-10 11:33:30, 2021-05-10 11:34:00}|EUR.USD |4 |
|{2021-05-10 11:34:00, 2021-05-10 11:34:30}|EUR.USD |4 |
|{2021-05-10 11:34:30, 2021-05-10 11:35:00}|EUR.USD |4 |
|{2021-05-10 11:35:00, 2021-05-10 11:35:30}|EUR.USD |4 |
|{2021-05-10 11:35:30, 2021-05-10 11:36:00}|EUR.USD |4 |
|{2021-05-10 11:36:00, 2021-05-10 11:36:30}|EUR.USD |4 |
|{2021-05-10 11:33:30, 2021-05-10 11:34:00}|USD.JPY |6 |
|{2021-05-10 11:34:00, 2021-05-10 11:34:30}|USD.JPY |6 |
|{2021-05-10 11:34:30, 2021-05-10 11:35:00}|USD.JPY |6 |
|{2021-05-10 11:35:00, 2021-05-10 11:35:30}|USD.JPY |6 |
|{2021-05-10 11:35:30, 2021-05-10 11:36:00}|USD.JPY |6 |
|{2021-05-10 11:36:00, 2021-05-10 11:36:30}|USD.JPY |6 |
+------------------------------------------+-----------+-----------------+
有什么想法吗?
谢谢!!
我使用了udf函数:
more code.......... this is index in a function....
print(self.level_1_batch_list_of_dict)
"""
3º Assign the final value of the dictionary to the dataframe
"""
# Passing List as Default value to a variable
def assign_value_dictionary_to_df(localSymbol, level_1_batch_list_of_dict=self.level_1_batch_list_of_dict):
for level_1_batch_dict in level_1_batch_list_of_dict:
if level_1_batch_dict['localSymbol_aux'] == localSymbol:
return level_1_batch_dict['level_1_precio_max_sesion_aux']
udf_assign_value_dictionary_to_df = f.udf(assign_value_dictionary_to_df)
self.level_1_df_process_all_field_group_by_window \
.select(
f.col('*')
).withColumn(
"precio_max_sesion", udf_assign_value_dictionary_to_df("localSymbol")
).show(30,False)
您可以将上面的字典转换为 PySpark Dataframe,然后进行简单的连接
# this is the reference dataframe
ref = spark.createDataFrame(level_1_batch_list_of_dict)
ref.show(10, False)
# +-----------------------------+-----------------------------+---------------+
# |level_1_precio_max_sesion_aux|level_1_precio_min_sesion_aux|localSymbol_aux|
# +-----------------------------+-----------------------------+---------------+
# |4 |0 |EUR.USD |
# |6 |0 |USD.JPY |
# +-----------------------------+-----------------------------+---------------+
# this is your main dataframe
df.show(2, False)
# +------------------------------------------+-----------+-----------------+
# |ventana |localSymbol|precio_max_sesion|
# +------------------------------------------+-----------+-----------------+
# |{2021-05-10 11:33:30, 2021-05-10 11:34:00}|EUR.USD |0 |
# |{2021-05-10 11:34:00, 2021-05-10 11:34:30}|EUR.USD |0 |
# +------------------------------------------+-----------+-----------------+
# join them together and get correct values
(df
.join(ref, on=[df.localSymbol == ref.localSymbol_aux])
.select(df.ventana, df.localSymbol, ref.level_1_precio_max_sesion_aux.alias('precio_max_sesion'))
.show(100, False)
)
# +------------------------------------------+-----------+-----------------+
# |ventana |localSymbol|precio_max_sesion|
# +------------------------------------------+-----------+-----------------+
# |{2021-05-10 11:33:30, 2021-05-10 11:34:00}|EUR.USD |4 |
# |{2021-05-10 11:34:00, 2021-05-10 11:34:30}|EUR.USD |4 |
# |{2021-05-10 11:34:30, 2021-05-10 11:35:00}|EUR.USD |4 |
# |{2021-05-10 11:35:00, 2021-05-10 11:35:30}|EUR.USD |4 |
# |{2021-05-10 11:35:30, 2021-05-10 11:36:00}|EUR.USD |4 |
# |{2021-05-10 11:36:00, 2021-05-10 11:36:30}|EUR.USD |4 |
# |{2021-05-10 11:33:30, 2021-05-10 11:34:00}|USD.JPY |6 |
# |{2021-05-10 11:34:00, 2021-05-10 11:34:30}|USD.JPY |6 |
# |{2021-05-10 11:34:30, 2021-05-10 11:35:00}|USD.JPY |6 |
# |{2021-05-10 11:35:00, 2021-05-10 11:35:30}|USD.JPY |6 |
# |{2021-05-10 11:35:30, 2021-05-10 11:36:00}|USD.JPY |6 |
# |{2021-05-10 11:36:00, 2021-05-10 11:36:30}|USD.JPY |6 |
# +------------------------------------------+-----------+-----------------+
我想根据字典的字段 (localSymbol_aux) 将字典列表的值分配给数据框,具体取决于 spark 结构化流中数据框 (localSymbol) 的列值 ( pyspark)
我有以下词典列表
level_1_batch_list_of_dict = [
{
'localSymbol_aux': 'EUR.USD',
'level_1_precio_max_sesion_aux': 4,
'level_1_precio_min_sesion_aux': 0
},
{
'localSymbol_aux': 'USD.JPY',
'level_1_precio_max_sesion_aux': 6,
'level_1_precio_min_sesion_aux': 0
}
]
我有以下数据框:
+------------------------------------------+-----------+-----------------+
|ventana |localSymbol|precio_max_sesion|
+------------------------------------------+-----------+-----------------+
|{2021-05-10 11:33:30, 2021-05-10 11:34:00}|EUR.USD |0 |
|{2021-05-10 11:34:00, 2021-05-10 11:34:30}|EUR.USD |0 |
|{2021-05-10 11:34:30, 2021-05-10 11:35:00}|EUR.USD |0 |
|{2021-05-10 11:35:00, 2021-05-10 11:35:30}|EUR.USD |0 |
|{2021-05-10 11:35:30, 2021-05-10 11:36:00}|EUR.USD |0 |
|{2021-05-10 11:36:00, 2021-05-10 11:36:30}|EUR.USD |0 |
|{2021-05-10 11:33:30, 2021-05-10 11:34:00}|USD.JPY |0 |
|{2021-05-10 11:34:00, 2021-05-10 11:34:30}|USD.JPY |0 |
|{2021-05-10 11:34:30, 2021-05-10 11:35:00}|USD.JPY |0 |
|{2021-05-10 11:35:00, 2021-05-10 11:35:30}|USD.JPY |0 |
|{2021-05-10 11:35:30, 2021-05-10 11:36:00}|USD.JPY |0 |
|{2021-05-10 11:36:00, 2021-05-10 11:36:30}|USD.JPY |0 |
+------------------------------------------+-----------+-----------------+
但我想要以下内容:
+------------------------------------------+-----------+-----------------+
|ventana |localSymbol|precio_max_sesion|
+------------------------------------------+-----------+-----------------+
|{2021-05-10 11:33:30, 2021-05-10 11:34:00}|EUR.USD |4 |
|{2021-05-10 11:34:00, 2021-05-10 11:34:30}|EUR.USD |4 |
|{2021-05-10 11:34:30, 2021-05-10 11:35:00}|EUR.USD |4 |
|{2021-05-10 11:35:00, 2021-05-10 11:35:30}|EUR.USD |4 |
|{2021-05-10 11:35:30, 2021-05-10 11:36:00}|EUR.USD |4 |
|{2021-05-10 11:36:00, 2021-05-10 11:36:30}|EUR.USD |4 |
|{2021-05-10 11:33:30, 2021-05-10 11:34:00}|USD.JPY |6 |
|{2021-05-10 11:34:00, 2021-05-10 11:34:30}|USD.JPY |6 |
|{2021-05-10 11:34:30, 2021-05-10 11:35:00}|USD.JPY |6 |
|{2021-05-10 11:35:00, 2021-05-10 11:35:30}|USD.JPY |6 |
|{2021-05-10 11:35:30, 2021-05-10 11:36:00}|USD.JPY |6 |
|{2021-05-10 11:36:00, 2021-05-10 11:36:30}|USD.JPY |6 |
+------------------------------------------+-----------+-----------------+
有什么想法吗?
谢谢!!
我使用了udf函数:
more code.......... this is index in a function....
print(self.level_1_batch_list_of_dict)
"""
3º Assign the final value of the dictionary to the dataframe
"""
# Passing List as Default value to a variable
def assign_value_dictionary_to_df(localSymbol, level_1_batch_list_of_dict=self.level_1_batch_list_of_dict):
for level_1_batch_dict in level_1_batch_list_of_dict:
if level_1_batch_dict['localSymbol_aux'] == localSymbol:
return level_1_batch_dict['level_1_precio_max_sesion_aux']
udf_assign_value_dictionary_to_df = f.udf(assign_value_dictionary_to_df)
self.level_1_df_process_all_field_group_by_window \
.select(
f.col('*')
).withColumn(
"precio_max_sesion", udf_assign_value_dictionary_to_df("localSymbol")
).show(30,False)
您可以将上面的字典转换为 PySpark Dataframe,然后进行简单的连接
# this is the reference dataframe
ref = spark.createDataFrame(level_1_batch_list_of_dict)
ref.show(10, False)
# +-----------------------------+-----------------------------+---------------+
# |level_1_precio_max_sesion_aux|level_1_precio_min_sesion_aux|localSymbol_aux|
# +-----------------------------+-----------------------------+---------------+
# |4 |0 |EUR.USD |
# |6 |0 |USD.JPY |
# +-----------------------------+-----------------------------+---------------+
# this is your main dataframe
df.show(2, False)
# +------------------------------------------+-----------+-----------------+
# |ventana |localSymbol|precio_max_sesion|
# +------------------------------------------+-----------+-----------------+
# |{2021-05-10 11:33:30, 2021-05-10 11:34:00}|EUR.USD |0 |
# |{2021-05-10 11:34:00, 2021-05-10 11:34:30}|EUR.USD |0 |
# +------------------------------------------+-----------+-----------------+
# join them together and get correct values
(df
.join(ref, on=[df.localSymbol == ref.localSymbol_aux])
.select(df.ventana, df.localSymbol, ref.level_1_precio_max_sesion_aux.alias('precio_max_sesion'))
.show(100, False)
)
# +------------------------------------------+-----------+-----------------+
# |ventana |localSymbol|precio_max_sesion|
# +------------------------------------------+-----------+-----------------+
# |{2021-05-10 11:33:30, 2021-05-10 11:34:00}|EUR.USD |4 |
# |{2021-05-10 11:34:00, 2021-05-10 11:34:30}|EUR.USD |4 |
# |{2021-05-10 11:34:30, 2021-05-10 11:35:00}|EUR.USD |4 |
# |{2021-05-10 11:35:00, 2021-05-10 11:35:30}|EUR.USD |4 |
# |{2021-05-10 11:35:30, 2021-05-10 11:36:00}|EUR.USD |4 |
# |{2021-05-10 11:36:00, 2021-05-10 11:36:30}|EUR.USD |4 |
# |{2021-05-10 11:33:30, 2021-05-10 11:34:00}|USD.JPY |6 |
# |{2021-05-10 11:34:00, 2021-05-10 11:34:30}|USD.JPY |6 |
# |{2021-05-10 11:34:30, 2021-05-10 11:35:00}|USD.JPY |6 |
# |{2021-05-10 11:35:00, 2021-05-10 11:35:30}|USD.JPY |6 |
# |{2021-05-10 11:35:30, 2021-05-10 11:36:00}|USD.JPY |6 |
# |{2021-05-10 11:36:00, 2021-05-10 11:36:30}|USD.JPY |6 |
# +------------------------------------------+-----------+-----------------+