临时 table 和数据库 table 在 Spark SQL 中的可见性,是否可以从通常的 jdbc 查询对临时 table 进行嵌套查询
Visibility of temproray tables and database tables in Spark SQL, is it possible to make a nested query to temprorary table from usual jdbc query
我有一个 DataFrame
临时 table
val dailySummariesDfVisualize =
dailySummariesDf
.orderBy("event_time").registerTempTable("raw")
我可以用 Spark 从中提取一些内容 SQL:
val df = sqlContext.sql("SELECT * FROM raw")
df.show()
并且输出有效。然后我想对 JDBC 数据库查询中的临时 table 进行嵌套查询,如下所示:
val dailySensorData =
getDFFromJdbcSource(SparkSession.builder().appName("test").master("local").getOrCreate(),
s"SELECT * FROM values WHERE time in (SELECT event_time FROM raw) limit 1000000")
.persist(StorageLevel.MEMORY_ONLY_SER)
dailySensorData.show(400, false)
这里我得到了例外:
org.postgresql.util.PSQLException: ERROR: relation "raw" does not exist
如果我尝试在 sqlContext.sql()
中执行
val df = sqlContext.sql("SELECT * FROM values WHERE time in (SELECT event_time FROM raw)")
df.show()
我得到:
org.apache.spark.sql.AnalysisException: Table or view not found: values; line 1 pos 14;
'Project [*]
+- 'Filter 'time IN (list#4967 [])
: +- 'Project ['event_time]
: +- 'UnresolvedRelation [raw]
+- 'UnresolvedRelation [values]
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis(CheckAnalysis.scala:106)
就像这两个值(真实的 jdbc table)和原始的(临时的 table)一样,从中看不到。如何在嵌套查询中使用 temp table?
UPD
根据 mazaneicha 的说法,我已经尝试过(在此处检索所有值,因为无法使用嵌套查询来限制它们):
val dailySummariesDfVisualize =
dailySummariesDf
.orderBy("event_time").createOrReplaceTempView("raw")
val dailySensorData =
getDFFromJdbcSource(SparkSession.builder().appName("test").master("local").getOrCreate(),
s"SELECT * FROM values").createOrReplaceTempView("values")
val df = sqlContext.sql("SELECT * FROM values WHERE time in (SELECT event_time FROM raw)")
df.explain(true)
这是合乎逻辑的计划:
= Parsed Logical Plan ==
'Project [*]
+- 'Filter 'time IN (list#5475 [])
: +- 'Project ['event_time]
: +- 'UnresolvedRelation [raw]
+- 'UnresolvedRelation [values]
== Analyzed Logical Plan ==
devicename: string, value: double, time: timestamp, coffee_machine_id: string, digital_twin_id: string, write_time: timestamp
Project [devicename#5457, value#5458, time#5459, coffee_machine_id#5460, digital_twin_id#5461, write_time#5462]
+- Filter time#5459 IN (list#5475 [])
: +- Project [event_time#4836]
: +- SubqueryAlias raw
: +- Sort [event_time#4836 ASC NULLS FIRST], true
: +- Relation[event_type#4835,event_time#4836,event_payload#4837,coffee_machine_id#4838,digital_twin_id#4839] JDBCRelation((SELECT * FROM events WHERE (event_time > '2021-03-31' or event_time < '2021-03-30') and event_type != 'Coffee_Capsule_RFID_Event' and event_type!='Coffee_Cup_RFID_Event' limit 2000000) SPARK_GEN_SUBQ_48) [numPartitions=1]
+- SubqueryAlias values
+- Relation[devicename#5457,value#5458,time#5459,coffee_machine_id#5460,digital_twin_id#5461,write_time#5462] JDBCRelation((SELECT * FROM values) SPARK_GEN_SUBQ_65) [numPartitions=1]
== Optimized Logical Plan ==
Join LeftSemi, (time#5459 = event_time#4836)
:- Relation[devicename#5457,value#5458,time#5459,coffee_machine_id#5460,digital_twin_id#5461,write_time#5462] JDBCRelation((SELECT * FROM values) SPARK_GEN_SUBQ_65) [numPartitions=1]
+- Project [event_time#4836]
+- Relation[event_type#4835,event_time#4836,event_payload#4837,coffee_machine_id#4838,digital_twin_id#4839] JDBCRelation((SELECT * FROM events WHERE (event_time > '2021-03-31' or event_time < '2021-03-30') and event_type != 'Coffee_Capsule_RFID_Event' and event_type!='Coffee_Cup_RFID_Event' limit 2000000) SPARK_GEN_SUBQ_48) [numPartitions=1]
== Physical Plan ==
SortMergeJoin [time#5459], [event_time#4836], LeftSemi
:- *(2) Sort [time#5459 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(time#5459, 200), true, [id=#1219]
: +- *(1) Scan JDBCRelation((SELECT * FROM values) SPARK_GEN_SUBQ_65) [numPartitions=1] [devicename#5457,value#5458,time#5459,coffee_machine_id#5460,digital_twin_id#5461,write_time#5462] PushedFilters: [], ReadSchema: struct<devicename:string,value:double,time:timestamp,coffee_machine_id:string,digital_twin_id:str...
+- *(4) Sort [event_time#4836 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(event_time#4836, 200), true, [id=#1224]
+- *(3) Scan JDBCRelation((SELECT * FROM events WHERE (event_time > '2021-03-31' or event_time < '2021-03-30') and event_type != 'Coffee_Capsule_RFID_Event' and event_type!='Coffee_Cup_RFID_Event' limit 2000000) SPARK_GEN_SUBQ_48) [numPartitions=1] [event_time#4836] PushedFilters: [], ReadSchema: struct<event_time:timestamp>
根据 mazaneicha 的建议,我能够通过在 scala 中从 DataFramw 行生成 where 子句来解决这个问题,与我从中提取查询的数据相比,这些行并没有那么多:
var collectedString = scala.collection.mutable.MutableList[String]()
for (row <- dailySummariesDfVisualize.collectAsList())
{
println(row(1))
val start = row(1)
val end = row(5)
val timeSelection = s" time > ' ${start}' and time < '${end}'"
collectedString+=timeSelection
}
val whereClause = collectedString.mkString(" or ")
println(whereClause)
val dailySensorData =
getDFFromJdbcSource(SparkSession.builder().appName("test").master("local").getOrCreate(),
s"SELECT * FROM values WHERE "+whereClause+" limit 1000000")
.persist(StorageLevel.MEMORY_ONLY_SER)
dailySensorData.show(400, false)
它以可接受的性能输出我实际需要的内容。
格式化的 whereClause 输出类似于:
time > ' 2021-03-24 07:06:34.0' and time < '2021-03-24 07:08:34.0' or time > ' 2021-03-24 07:07:41.0' and time < '2021-03-24 07:09:41.0' or time > ' 2021-03-24 07:07:43.0' and time < '2021-03-24 07:09:43.0'
等等
我有一个 DataFrame
临时 table
val dailySummariesDfVisualize =
dailySummariesDf
.orderBy("event_time").registerTempTable("raw")
我可以用 Spark 从中提取一些内容 SQL:
val df = sqlContext.sql("SELECT * FROM raw")
df.show()
并且输出有效。然后我想对 JDBC 数据库查询中的临时 table 进行嵌套查询,如下所示:
val dailySensorData =
getDFFromJdbcSource(SparkSession.builder().appName("test").master("local").getOrCreate(),
s"SELECT * FROM values WHERE time in (SELECT event_time FROM raw) limit 1000000")
.persist(StorageLevel.MEMORY_ONLY_SER)
dailySensorData.show(400, false)
这里我得到了例外:
org.postgresql.util.PSQLException: ERROR: relation "raw" does not exist
如果我尝试在 sqlContext.sql()
中执行
val df = sqlContext.sql("SELECT * FROM values WHERE time in (SELECT event_time FROM raw)")
df.show()
我得到:
org.apache.spark.sql.AnalysisException: Table or view not found: values; line 1 pos 14;
'Project [*]
+- 'Filter 'time IN (list#4967 [])
: +- 'Project ['event_time]
: +- 'UnresolvedRelation [raw]
+- 'UnresolvedRelation [values]
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis(CheckAnalysis.scala:106)
就像这两个值(真实的 jdbc table)和原始的(临时的 table)一样,从中看不到。如何在嵌套查询中使用 temp table?
UPD
根据 mazaneicha 的说法,我已经尝试过(在此处检索所有值,因为无法使用嵌套查询来限制它们):
val dailySummariesDfVisualize =
dailySummariesDf
.orderBy("event_time").createOrReplaceTempView("raw")
val dailySensorData =
getDFFromJdbcSource(SparkSession.builder().appName("test").master("local").getOrCreate(),
s"SELECT * FROM values").createOrReplaceTempView("values")
val df = sqlContext.sql("SELECT * FROM values WHERE time in (SELECT event_time FROM raw)")
df.explain(true)
这是合乎逻辑的计划:
= Parsed Logical Plan ==
'Project [*]
+- 'Filter 'time IN (list#5475 [])
: +- 'Project ['event_time]
: +- 'UnresolvedRelation [raw]
+- 'UnresolvedRelation [values]
== Analyzed Logical Plan ==
devicename: string, value: double, time: timestamp, coffee_machine_id: string, digital_twin_id: string, write_time: timestamp
Project [devicename#5457, value#5458, time#5459, coffee_machine_id#5460, digital_twin_id#5461, write_time#5462]
+- Filter time#5459 IN (list#5475 [])
: +- Project [event_time#4836]
: +- SubqueryAlias raw
: +- Sort [event_time#4836 ASC NULLS FIRST], true
: +- Relation[event_type#4835,event_time#4836,event_payload#4837,coffee_machine_id#4838,digital_twin_id#4839] JDBCRelation((SELECT * FROM events WHERE (event_time > '2021-03-31' or event_time < '2021-03-30') and event_type != 'Coffee_Capsule_RFID_Event' and event_type!='Coffee_Cup_RFID_Event' limit 2000000) SPARK_GEN_SUBQ_48) [numPartitions=1]
+- SubqueryAlias values
+- Relation[devicename#5457,value#5458,time#5459,coffee_machine_id#5460,digital_twin_id#5461,write_time#5462] JDBCRelation((SELECT * FROM values) SPARK_GEN_SUBQ_65) [numPartitions=1]
== Optimized Logical Plan ==
Join LeftSemi, (time#5459 = event_time#4836)
:- Relation[devicename#5457,value#5458,time#5459,coffee_machine_id#5460,digital_twin_id#5461,write_time#5462] JDBCRelation((SELECT * FROM values) SPARK_GEN_SUBQ_65) [numPartitions=1]
+- Project [event_time#4836]
+- Relation[event_type#4835,event_time#4836,event_payload#4837,coffee_machine_id#4838,digital_twin_id#4839] JDBCRelation((SELECT * FROM events WHERE (event_time > '2021-03-31' or event_time < '2021-03-30') and event_type != 'Coffee_Capsule_RFID_Event' and event_type!='Coffee_Cup_RFID_Event' limit 2000000) SPARK_GEN_SUBQ_48) [numPartitions=1]
== Physical Plan ==
SortMergeJoin [time#5459], [event_time#4836], LeftSemi
:- *(2) Sort [time#5459 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(time#5459, 200), true, [id=#1219]
: +- *(1) Scan JDBCRelation((SELECT * FROM values) SPARK_GEN_SUBQ_65) [numPartitions=1] [devicename#5457,value#5458,time#5459,coffee_machine_id#5460,digital_twin_id#5461,write_time#5462] PushedFilters: [], ReadSchema: struct<devicename:string,value:double,time:timestamp,coffee_machine_id:string,digital_twin_id:str...
+- *(4) Sort [event_time#4836 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(event_time#4836, 200), true, [id=#1224]
+- *(3) Scan JDBCRelation((SELECT * FROM events WHERE (event_time > '2021-03-31' or event_time < '2021-03-30') and event_type != 'Coffee_Capsule_RFID_Event' and event_type!='Coffee_Cup_RFID_Event' limit 2000000) SPARK_GEN_SUBQ_48) [numPartitions=1] [event_time#4836] PushedFilters: [], ReadSchema: struct<event_time:timestamp>
根据 mazaneicha 的建议,我能够通过在 scala 中从 DataFramw 行生成 where 子句来解决这个问题,与我从中提取查询的数据相比,这些行并没有那么多:
var collectedString = scala.collection.mutable.MutableList[String]()
for (row <- dailySummariesDfVisualize.collectAsList())
{
println(row(1))
val start = row(1)
val end = row(5)
val timeSelection = s" time > ' ${start}' and time < '${end}'"
collectedString+=timeSelection
}
val whereClause = collectedString.mkString(" or ")
println(whereClause)
val dailySensorData =
getDFFromJdbcSource(SparkSession.builder().appName("test").master("local").getOrCreate(),
s"SELECT * FROM values WHERE "+whereClause+" limit 1000000")
.persist(StorageLevel.MEMORY_ONLY_SER)
dailySensorData.show(400, false)
它以可接受的性能输出我实际需要的内容。
格式化的 whereClause 输出类似于:
time > ' 2021-03-24 07:06:34.0' and time < '2021-03-24 07:08:34.0' or time > ' 2021-03-24 07:07:41.0' and time < '2021-03-24 07:09:41.0' or time > ' 2021-03-24 07:07:43.0' and time < '2021-03-24 07:09:43.0'
等等