将参数传递给 Spark Insert 脚本
Pass parameters to Spark Insert script
这一定很简单,但我坚持了很长时间。
我正在尝试将参数传递给我的插入脚本,而此脚本的输出返回 NULL。我在这里做错了什么?我正在 Azure Databricks 上写这篇文章,这是一个 Python 笔记本。
spark.sql("CREATE TABLE IF NOT EXISTS DB.RUN_LOG (RunId INT, CreatedDate timestamp, Status string, ErrorDetail string)")
dfMaxRunID = spark.sql("select COALESCE(MAX(RunId),0) MaxRunId from DB.RUN_LOG")
vMaxRunId = dfMaxRunID.first()['MaxRunId']
vInsertRunId = vMaxRunId + 1
vFinal_CurrentTimeStamp = '2019-07-24 12:02:41'
print(vMaxRunId)
print(vInsertRunId)
print(vFinal_CurrentTimeStamp)
spark.sql("INSERT INTO TABLE DB.RUN_LOG values('vInsertRunId','vFinal_CurrentTimeStamp',null,null)")
spark.sql("SELECT * FROM DB.RUN_LOG").show()
我认为下面的步骤有问题。
vMaxRunId = dfMaxRunID.first()['MaxRunId']
我建议...
vMaxRunId = dfMaxRunID.select("MaxRunId").first()(0)
然后它会很好地打印一个Id。
将下面的插入语句替换为:
>>> spark.sql("INSERT INTO TABLE DB.RUN_LOG values(%s,'%s','%s','%s')"%(vInsertRunId,vFinal_CurrentTimeStamp,'null','null'))
DataFrame[]
>>> spark.sql("SELECT * FROM DB.RUN_LOG").show()
+-----+-------------------+------+-----------+
|RunId| CreatedDate|Status|ErrorDetail|
+-----+-------------------+------+-----------+
| 1|2019-07-24 12:02:41| null| null|
+-----+-------------------+------+-----------+
hive> select * from test_dev_db.RUN_LOG;
OK
1 2019-07-24 12:02:41 null null
Time taken: 0.217 seconds, Fetched: 1 row(s)
刚刚检查过 - 最后两列需要空值。所以正确的说法是:
spark.sql("INSERT INTO TABLE db.RUN_LOG values(%s,'%s',null,null)"%(vInsertRunId,vFinal_CurrentTimeStamp))
>>> spark.sql("SELECT * FROM db.RUN_LOG").show()
+-----+-------------------+------+-----------+
|RunId| CreatedDate|Status|ErrorDetail|
+-----+-------------------+------+-----------+
| 1|2019-07-24 12:02:41| null| null|
+-----+-------------------+------+-----------+
hive> select * from test_dev_db.RUN_LOG;
OK
1 2019-07-24 12:02:41 NULL NULL
这一定很简单,但我坚持了很长时间。
我正在尝试将参数传递给我的插入脚本,而此脚本的输出返回 NULL。我在这里做错了什么?我正在 Azure Databricks 上写这篇文章,这是一个 Python 笔记本。
spark.sql("CREATE TABLE IF NOT EXISTS DB.RUN_LOG (RunId INT, CreatedDate timestamp, Status string, ErrorDetail string)")
dfMaxRunID = spark.sql("select COALESCE(MAX(RunId),0) MaxRunId from DB.RUN_LOG")
vMaxRunId = dfMaxRunID.first()['MaxRunId']
vInsertRunId = vMaxRunId + 1
vFinal_CurrentTimeStamp = '2019-07-24 12:02:41'
print(vMaxRunId)
print(vInsertRunId)
print(vFinal_CurrentTimeStamp)
spark.sql("INSERT INTO TABLE DB.RUN_LOG values('vInsertRunId','vFinal_CurrentTimeStamp',null,null)")
spark.sql("SELECT * FROM DB.RUN_LOG").show()
我认为下面的步骤有问题。
vMaxRunId = dfMaxRunID.first()['MaxRunId']
我建议...
vMaxRunId = dfMaxRunID.select("MaxRunId").first()(0)
然后它会很好地打印一个Id。
将下面的插入语句替换为:
>>> spark.sql("INSERT INTO TABLE DB.RUN_LOG values(%s,'%s','%s','%s')"%(vInsertRunId,vFinal_CurrentTimeStamp,'null','null'))
DataFrame[]
>>> spark.sql("SELECT * FROM DB.RUN_LOG").show()
+-----+-------------------+------+-----------+
|RunId| CreatedDate|Status|ErrorDetail|
+-----+-------------------+------+-----------+
| 1|2019-07-24 12:02:41| null| null|
+-----+-------------------+------+-----------+
hive> select * from test_dev_db.RUN_LOG;
OK
1 2019-07-24 12:02:41 null null
Time taken: 0.217 seconds, Fetched: 1 row(s)
刚刚检查过 - 最后两列需要空值。所以正确的说法是:
spark.sql("INSERT INTO TABLE db.RUN_LOG values(%s,'%s',null,null)"%(vInsertRunId,vFinal_CurrentTimeStamp))
>>> spark.sql("SELECT * FROM db.RUN_LOG").show()
+-----+-------------------+------+-----------+
|RunId| CreatedDate|Status|ErrorDetail|
+-----+-------------------+------+-----------+
| 1|2019-07-24 12:02:41| null| null|
+-----+-------------------+------+-----------+
hive> select * from test_dev_db.RUN_LOG;
OK
1 2019-07-24 12:02:41 NULL NULL