为 pyspark 数据框中的每个键生成查询

generate queries for each key in pyspark data frame

我在 pyspark 中有一个数据框,如下所示

df = spark.createDataFrame(
[
('2021-10-01','A',25),
('2021-10-02','B',24),
('2021-10-03','C',20),
('2021-10-04','D',21),
('2021-10-05','E',20),
('2021-10-06','F',22),
('2021-10-07','G',23),
('2021-10-08','H',24)],("RUN_DATE", "NAME", "VALUE"))

现在使用这个数据框我想在 MySql

中更新一个 table
# query to run should be similar to this
update_query = "UPDATE DB.TABLE SET DATE = '2021-10-01', VALUE = 25 WHERE NAME = 'A'"

# mysql_conn is a function which I use to connect to `MySql`  from `pyspark` and run queries
# Invoking the function 
mysql_conn(host, user_name, password, update_query)

现在,当我通过传递参数调用 mysql_conn 函数时,查询 运行 成功并且记录在 MySql table.[=20= 中更新]

现在我想运行数据框中所有记录的更新语句。

对于每个 NAME,它必须选择 RUN_DATEVALUE 并替换为 update_query 并触发 mysql_conn。

我认为我们需要 for loop 但不确定如何进行。

与其使用 for 循环遍历数据帧,不如使用 foreachPartition 在每个分区之间分配工作负载会更好。此外,由于您正在编写自定义查询而不是为每个查询执行一个查询,因此执行批处理操作以减少往返、延迟和并发连接会更有效。例如

def update_db(rows):
    temp_table_query=""
    for row in rows:
        if len(temp_table_query) > 0:
            temp_table_query = temp_table_query + " UNION ALL "
        temp_table_query = temp_table_query + " SELECT '%s' as RUNDATE, '%s' as NAME, %d as VALUE " % (row.RUN_DATE,row.NAME,row.VALUE)
  
    update_query="""
        UPDATE DBTABLE 
        INNER JOIN (
            %s
        ) new_records ON DBTABLE.NAME = new_records.NAME
        SET 
            DBTABLE.DATE = new_records.RUNDATE, 
            DBTABLE.VALUE = new_records.VALUE 
    """ % (temp_table_query)
    mysql_conn(host, user_name, password, update_query)
    

df.foreachPartition(update_db)

View Demo on how the UPDATE query works

让我知道这是否适合你。