根据数据框的列值,根据字典的值将字典列表的值分配给数据框

Assign the value of a list of dictionaries depending on the value of a dictionary to a dataframe depending on the column value of the dataframe

我想根据字典的字段 (localSymbol_aux) 将字典列表的值分配给数据框,具体取决于 spark 结构化流中数据框 (localSymbol) 的列值 ( pyspark)

我有以下词典列表

level_1_batch_list_of_dict = [
{
'localSymbol_aux': 'EUR.USD', 
'level_1_precio_max_sesion_aux': 4,
'level_1_precio_min_sesion_aux': 0
},
{
'localSymbol_aux': 'USD.JPY', 
'level_1_precio_max_sesion_aux': 6,
'level_1_precio_min_sesion_aux': 0
}
]

我有以下数据框:

+------------------------------------------+-----------+-----------------+
|ventana                                   |localSymbol|precio_max_sesion|
+------------------------------------------+-----------+-----------------+
|{2021-05-10 11:33:30, 2021-05-10 11:34:00}|EUR.USD    |0                |
|{2021-05-10 11:34:00, 2021-05-10 11:34:30}|EUR.USD    |0                |
|{2021-05-10 11:34:30, 2021-05-10 11:35:00}|EUR.USD    |0                |
|{2021-05-10 11:35:00, 2021-05-10 11:35:30}|EUR.USD    |0                |
|{2021-05-10 11:35:30, 2021-05-10 11:36:00}|EUR.USD    |0                |
|{2021-05-10 11:36:00, 2021-05-10 11:36:30}|EUR.USD    |0                |
|{2021-05-10 11:33:30, 2021-05-10 11:34:00}|USD.JPY    |0                |
|{2021-05-10 11:34:00, 2021-05-10 11:34:30}|USD.JPY    |0                |
|{2021-05-10 11:34:30, 2021-05-10 11:35:00}|USD.JPY    |0                |
|{2021-05-10 11:35:00, 2021-05-10 11:35:30}|USD.JPY    |0                |
|{2021-05-10 11:35:30, 2021-05-10 11:36:00}|USD.JPY    |0                |
|{2021-05-10 11:36:00, 2021-05-10 11:36:30}|USD.JPY    |0                |
+------------------------------------------+-----------+-----------------+

但我想要以下内容:

+------------------------------------------+-----------+-----------------+
|ventana                                   |localSymbol|precio_max_sesion|
+------------------------------------------+-----------+-----------------+
|{2021-05-10 11:33:30, 2021-05-10 11:34:00}|EUR.USD    |4                |
|{2021-05-10 11:34:00, 2021-05-10 11:34:30}|EUR.USD    |4                |
|{2021-05-10 11:34:30, 2021-05-10 11:35:00}|EUR.USD    |4                |
|{2021-05-10 11:35:00, 2021-05-10 11:35:30}|EUR.USD    |4                |
|{2021-05-10 11:35:30, 2021-05-10 11:36:00}|EUR.USD    |4                |
|{2021-05-10 11:36:00, 2021-05-10 11:36:30}|EUR.USD    |4                |
|{2021-05-10 11:33:30, 2021-05-10 11:34:00}|USD.JPY    |6                |
|{2021-05-10 11:34:00, 2021-05-10 11:34:30}|USD.JPY    |6                |
|{2021-05-10 11:34:30, 2021-05-10 11:35:00}|USD.JPY    |6                |
|{2021-05-10 11:35:00, 2021-05-10 11:35:30}|USD.JPY    |6                |
|{2021-05-10 11:35:30, 2021-05-10 11:36:00}|USD.JPY    |6                |
|{2021-05-10 11:36:00, 2021-05-10 11:36:30}|USD.JPY    |6                |
+------------------------------------------+-----------+-----------------+

有什么想法吗?

谢谢!!

我使用了udf函数:

    more code.......... this is index in a function....

    print(self.level_1_batch_list_of_dict)
    """
    3º Assign the final value of the dictionary to the dataframe
    """
    # Passing List as Default value to a variable
    def assign_value_dictionary_to_df(localSymbol, level_1_batch_list_of_dict=self.level_1_batch_list_of_dict):
        for level_1_batch_dict in level_1_batch_list_of_dict:
            if level_1_batch_dict['localSymbol_aux'] == localSymbol:
                return level_1_batch_dict['level_1_precio_max_sesion_aux']

    udf_assign_value_dictionary_to_df = f.udf(assign_value_dictionary_to_df)

    self.level_1_df_process_all_field_group_by_window \
        .select(
            f.col('*')
        ).withColumn(
            "precio_max_sesion", udf_assign_value_dictionary_to_df("localSymbol")
            ).show(30,False)

您可以将上面的字典转换为 PySpark Dataframe,然后进行简单的连接

# this is the reference dataframe
ref = spark.createDataFrame(level_1_batch_list_of_dict)
ref.show(10, False)
# +-----------------------------+-----------------------------+---------------+
# |level_1_precio_max_sesion_aux|level_1_precio_min_sesion_aux|localSymbol_aux|
# +-----------------------------+-----------------------------+---------------+
# |4                            |0                            |EUR.USD        |
# |6                            |0                            |USD.JPY        |
# +-----------------------------+-----------------------------+---------------+

# this is your main dataframe
df.show(2, False)
# +------------------------------------------+-----------+-----------------+
# |ventana                                   |localSymbol|precio_max_sesion|
# +------------------------------------------+-----------+-----------------+
# |{2021-05-10 11:33:30, 2021-05-10 11:34:00}|EUR.USD    |0                |
# |{2021-05-10 11:34:00, 2021-05-10 11:34:30}|EUR.USD    |0                |
# +------------------------------------------+-----------+-----------------+

# join them together and get correct values
(df
    .join(ref, on=[df.localSymbol == ref.localSymbol_aux])
    .select(df.ventana, df.localSymbol, ref.level_1_precio_max_sesion_aux.alias('precio_max_sesion'))
    .show(100, False)
)
# +------------------------------------------+-----------+-----------------+
# |ventana                                   |localSymbol|precio_max_sesion|
# +------------------------------------------+-----------+-----------------+
# |{2021-05-10 11:33:30, 2021-05-10 11:34:00}|EUR.USD    |4                |
# |{2021-05-10 11:34:00, 2021-05-10 11:34:30}|EUR.USD    |4                |
# |{2021-05-10 11:34:30, 2021-05-10 11:35:00}|EUR.USD    |4                |
# |{2021-05-10 11:35:00, 2021-05-10 11:35:30}|EUR.USD    |4                |
# |{2021-05-10 11:35:30, 2021-05-10 11:36:00}|EUR.USD    |4                |
# |{2021-05-10 11:36:00, 2021-05-10 11:36:30}|EUR.USD    |4                |
# |{2021-05-10 11:33:30, 2021-05-10 11:34:00}|USD.JPY    |6                |
# |{2021-05-10 11:34:00, 2021-05-10 11:34:30}|USD.JPY    |6                |
# |{2021-05-10 11:34:30, 2021-05-10 11:35:00}|USD.JPY    |6                |
# |{2021-05-10 11:35:00, 2021-05-10 11:35:30}|USD.JPY    |6                |
# |{2021-05-10 11:35:30, 2021-05-10 11:36:00}|USD.JPY    |6                |
# |{2021-05-10 11:36:00, 2021-05-10 11:36:30}|USD.JPY    |6                |
# +------------------------------------------+-----------+-----------------+