向位于数据湖中的空数据框添加新行
Adding a new row to empty dataframe located in data lake
我使用以下代码创建了一个空数据框 table 以定位在 Delta:
deltaResultPath = "/ml/streaming-analysis/delta/Result"
# Create Delta Lake table
sqlq = "CREATE TABLE stockDailyPrices_delta USING DELTA LOCATION '" + deltaResultPath + "'"
spark.sql(sqlq)
我是spark新手,对sparkSQL代码不是很了解。我想要做的不是从另一个数据框中插入值,而是想添加在 python 脚本中生成的值。
比如修改代码:
insert_sql = "insert into stockDailyPrices_delta select f.* from stockDailyPrices f where f.price_date >= '" + price_date_min.strftime('%Y-%m-%d') + "' and f.price_date <= '" + price_date_max.strftime('%Y-%m-%d') + "'"
spark.sql(insert_sql)
到
Time = 10
cpu_temp = 3
dsp_temp = 5
insert_sql = "insert into df (Time, cpu_temp, dsp_temp) values (%s, %s, %s)"
spark.sql(insert_sql)
但是,我看到以下错误:
org.apache.spark.sql.catalyst.parser.ParseException:
ParseException: "\nmismatched input 'Time' expecting {'(', 'SELECT', 'FROM', 'DESC', 'VALUES', 'TABLE', 'INSERT', 'DESCRIBE', 'MAP', 'MERGE', 'UPDATE', 'REDUCE'}(line 1, pos 16)\n\n== SQL ==\ninsert into df (Time, cpu_temp, dsp_temp) values (%s, %s, %s)\n----------------^^^\n"
如何修复此代码?
我可以让它与这样的东西一起工作
spark.sql("insert into Result_delta select {} as Time, {} as cpu_temp, {} as dsp_temp".format(Time, cpu_temp, dsp_temp))
我使用以下代码创建了一个空数据框 table 以定位在 Delta:
deltaResultPath = "/ml/streaming-analysis/delta/Result"
# Create Delta Lake table
sqlq = "CREATE TABLE stockDailyPrices_delta USING DELTA LOCATION '" + deltaResultPath + "'"
spark.sql(sqlq)
我是spark新手,对sparkSQL代码不是很了解。我想要做的不是从另一个数据框中插入值,而是想添加在 python 脚本中生成的值。 比如修改代码:
insert_sql = "insert into stockDailyPrices_delta select f.* from stockDailyPrices f where f.price_date >= '" + price_date_min.strftime('%Y-%m-%d') + "' and f.price_date <= '" + price_date_max.strftime('%Y-%m-%d') + "'"
spark.sql(insert_sql)
到
Time = 10
cpu_temp = 3
dsp_temp = 5
insert_sql = "insert into df (Time, cpu_temp, dsp_temp) values (%s, %s, %s)"
spark.sql(insert_sql)
但是,我看到以下错误:
org.apache.spark.sql.catalyst.parser.ParseException:
ParseException: "\nmismatched input 'Time' expecting {'(', 'SELECT', 'FROM', 'DESC', 'VALUES', 'TABLE', 'INSERT', 'DESCRIBE', 'MAP', 'MERGE', 'UPDATE', 'REDUCE'}(line 1, pos 16)\n\n== SQL ==\ninsert into df (Time, cpu_temp, dsp_temp) values (%s, %s, %s)\n----------------^^^\n"
如何修复此代码?
我可以让它与这样的东西一起工作
spark.sql("insert into Result_delta select {} as Time, {} as cpu_temp, {} as dsp_temp".format(Time, cpu_temp, dsp_temp))