Spark SQL:交叉连接子查询

Spark SQL: Cross Join with sub-queries

我尝试 运行 在 pyspark(在 Spark 1.5.0 上)中进行以下 SQL 查询:

SELECT * FROM ( SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN ( SELECT obj AS origProperty2 FROM b LIMIT 10) tab2

pyspark 命令如下所示:

from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)

a = sqlCtx.parquetFile("hdfs/path/to/table/a.parquet")
a.registerTempTable("a")

b = sqlCtx.parquetFile("hdfs/path/to/table/b.parquet")
b.registerTempTable("b")

result = sqlCtx.sql("SELECT * FROM ( SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN ( SELECT obj AS origProperty2 FROM b LIMIT 10) tab2").collect()

但它产生了这个错误:

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 552, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o19.sql.
: java.lang.RuntimeException: [1.67] failure: ``union'' expected but identifier CROSS found

SELECT * FROM ( SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN ( SELECT obj AS origProperty2 FROM b LIMIT 10) tab2
                                                                  ^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:175)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:175)
at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others.apply(SparkSQLParser.scala:115)
at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others.apply(SparkSQLParser.scala:114)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$$anonfun$apply.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$$anonfun$apply.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$$anon$$anonfun$apply.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$$anonfun$apply.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:890)
at scala.util.parsing.combinator.PackratParsers$$anon.apply(PackratParsers.scala:110)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:172)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:172)
at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:42)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:195)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

据我所知,Spark SQL 应该支持 CROSS JOIN 和子查询(在 FROM 中)。您对如何修复它有什么建议吗?

谢谢 蒂莫

使用 HiveContext 而不是 SQLContext 似乎可以解决问题。此解决方案运行良好:

from pyspark.sql import HiveContext
sqlCtx = HiveContext(sc)

a = sqlCtx.parquetFile("hdfs/path/to/table/a.parquet")
a.registerTempTable("a")

b = sqlCtx.parquetFile("hdfs/path/to/table/b.parquet")
b.registerTempTable("b")

result = sqlCtx.sql("SELECT * FROM ( SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN ( SELECT obj AS origProperty2 FROM b LIMIT 10) tab2").collect()

但据我所知,只要支持子查询和交叉连接,这也应该与 SQLSpark SQL 的上下文一起使用...