为 pyspark 数据框中的每个键生成查询
generate queries for each key in pyspark data frame
我在 pyspark
中有一个数据框,如下所示
df = spark.createDataFrame(
[
('2021-10-01','A',25),
('2021-10-02','B',24),
('2021-10-03','C',20),
('2021-10-04','D',21),
('2021-10-05','E',20),
('2021-10-06','F',22),
('2021-10-07','G',23),
('2021-10-08','H',24)],("RUN_DATE", "NAME", "VALUE"))
现在使用这个数据框我想在 MySql
中更新一个 table
# query to run should be similar to this
update_query = "UPDATE DB.TABLE SET DATE = '2021-10-01', VALUE = 25 WHERE NAME = 'A'"
# mysql_conn is a function which I use to connect to `MySql` from `pyspark` and run queries
# Invoking the function
mysql_conn(host, user_name, password, update_query)
现在,当我通过传递参数调用 mysql_conn 函数时,查询 运行 成功并且记录在 MySql
table.[=20= 中更新]
现在我想运行数据框中所有记录的更新语句。
对于每个 NAME
,它必须选择 RUN_DATE
和 VALUE
并替换为 update_query
并触发 mysql_conn。
我认为我们需要 for loop
但不确定如何进行。
与其使用 for 循环遍历数据帧,不如使用 foreachPartition
在每个分区之间分配工作负载会更好。此外,由于您正在编写自定义查询而不是为每个查询执行一个查询,因此执行批处理操作以减少往返、延迟和并发连接会更有效。例如
def update_db(rows):
temp_table_query=""
for row in rows:
if len(temp_table_query) > 0:
temp_table_query = temp_table_query + " UNION ALL "
temp_table_query = temp_table_query + " SELECT '%s' as RUNDATE, '%s' as NAME, %d as VALUE " % (row.RUN_DATE,row.NAME,row.VALUE)
update_query="""
UPDATE DBTABLE
INNER JOIN (
%s
) new_records ON DBTABLE.NAME = new_records.NAME
SET
DBTABLE.DATE = new_records.RUNDATE,
DBTABLE.VALUE = new_records.VALUE
""" % (temp_table_query)
mysql_conn(host, user_name, password, update_query)
df.foreachPartition(update_db)
View Demo on how the UPDATE query works
让我知道这是否适合你。
我在 pyspark
中有一个数据框,如下所示
df = spark.createDataFrame(
[
('2021-10-01','A',25),
('2021-10-02','B',24),
('2021-10-03','C',20),
('2021-10-04','D',21),
('2021-10-05','E',20),
('2021-10-06','F',22),
('2021-10-07','G',23),
('2021-10-08','H',24)],("RUN_DATE", "NAME", "VALUE"))
现在使用这个数据框我想在 MySql
# query to run should be similar to this
update_query = "UPDATE DB.TABLE SET DATE = '2021-10-01', VALUE = 25 WHERE NAME = 'A'"
# mysql_conn is a function which I use to connect to `MySql` from `pyspark` and run queries
# Invoking the function
mysql_conn(host, user_name, password, update_query)
现在,当我通过传递参数调用 mysql_conn 函数时,查询 运行 成功并且记录在 MySql
table.[=20= 中更新]
现在我想运行数据框中所有记录的更新语句。
对于每个 NAME
,它必须选择 RUN_DATE
和 VALUE
并替换为 update_query
并触发 mysql_conn。
我认为我们需要 for loop
但不确定如何进行。
与其使用 for 循环遍历数据帧,不如使用 foreachPartition
在每个分区之间分配工作负载会更好。此外,由于您正在编写自定义查询而不是为每个查询执行一个查询,因此执行批处理操作以减少往返、延迟和并发连接会更有效。例如
def update_db(rows):
temp_table_query=""
for row in rows:
if len(temp_table_query) > 0:
temp_table_query = temp_table_query + " UNION ALL "
temp_table_query = temp_table_query + " SELECT '%s' as RUNDATE, '%s' as NAME, %d as VALUE " % (row.RUN_DATE,row.NAME,row.VALUE)
update_query="""
UPDATE DBTABLE
INNER JOIN (
%s
) new_records ON DBTABLE.NAME = new_records.NAME
SET
DBTABLE.DATE = new_records.RUNDATE,
DBTABLE.VALUE = new_records.VALUE
""" % (temp_table_query)
mysql_conn(host, user_name, password, update_query)
df.foreachPartition(update_db)
View Demo on how the UPDATE query works
让我知道这是否适合你。