如何在 pyspark.sql 中创建 table 作为 select
How to create a table as select in pyspark.sql
是否可以使用 select 语句在 spark 上创建 table?
我执行以下操作
import findspark
findspark.init()
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
sqlCtx = SQLContext(sc)
spark_df = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data/documents_topics.csv")
spark_df.registerTempTable("my_table")
sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table")
但我得到了错误
/Users/user/anaconda/bin/python
/Users/user/workspace/Outbrain-Click-Prediction/test.py Using Spark's
default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN". To adjust logging level use
sc.setLogLevel(newLevel). 17/01/21 17:19:43 WARN NativeCodeLoader:
Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable Traceback (most recent call
last): File
"/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/utils.py",
line 63, in deco
return f(*a, **kw) File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error
occurred while calling o19.sql. :
org.apache.spark.sql.AnalysisException: unresolved operator
'CreateHiveTableAsSelectLogicalPlan CatalogTable( Table: my_table_2
Created: Sat Jan 21 17:19:53 EST 2017 Last Access: Wed Dec 31
18:59:59 EST 1969 Type: MANAGED Storage(InputFormat:
org.apache.hadoop.mapred.TextInputFormat, OutputFormat:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false;;
'CreateHiveTableAsSelectLogicalPlan CatalogTable( Table: my_table_2
Created: Sat Jan 21 17:19:53 EST 2017 Last Access: Wed Dec 31
18:59:59 EST 1969 Type: MANAGED Storage(InputFormat:
org.apache.hadoop.mapred.TextInputFormat, OutputFormat:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false :
+- Project [document_id#0, topic_id#1, confidence_level#2] : +- SubqueryAlias my_table : +-
Relation[document_id#0,topic_id#1,confidence_level#2] csv
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:374)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:67)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at
py4j.Gateway.invoke(Gateway.java:280) at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79) at
py4j.GatewayConnection.run(GatewayConnection.java:214) at
java.lang.Thread.run(Thread.java:745)
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File
"/Users/user/workspace/Outbrain-Click-Prediction/test.py", line 16, in
sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table") File
"/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/context.py",
line 360, in sql
return self.sparkSession.sql(sqlQuery) File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/session.py",
line 543, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File
"/Users/user/spark-2.0.2-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py",
line 1133, in call File
"/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/utils.py",
line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: "unresolved operator
'CreateHiveTableAsSelectLogicalPlan CatalogTable(\n\tTable:
my_table_2
\n\tCreated: Sat Jan 21 17:19:53 EST 2017\n\tLast Access:
Wed Dec 31 18:59:59 EST 1969\n\tType: MANAGED\n\tStorage(InputFormat:
org.apache.hadoop.mapred.TextInputFormat, OutputFormat:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)),
false;;\n'CreateHiveTableAsSelectLogicalPlan CatalogTable(\n\tTable:
my_table_2
\n\tCreated: Sat Jan 21 17:19:53 EST 2017\n\tLast Access:
Wed Dec 31 18:59:59 EST 1969\n\tType: MANAGED\n\tStorage(InputFormat:
org.apache.hadoop.mapred.TextInputFormat, OutputFormat:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false\n:
+- Project [document_id#0, topic_id#1, confidence_level#2]\n: +- SubqueryAlias my_table\n: +-
Relation[document_id#0,topic_id#1,confidence_level#2] csv\n"
您应该首先执行 select 并将其分配给数据帧变量,然后像处理从 CSV 文件创建的数据帧一样使用 registerTempTable
注册它
我已经通过使用 HiveContext
而不是 SQLContext
更正了这个问题,如下所示:
import findspark
findspark.init()
import pyspark
from pyspark.sql import HiveContext
sqlCtx= HiveContext(sc)
spark_df = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data/documents_topics.csv")
spark_df.registerTempTable("my_table")
sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table")
是否可以使用 select 语句在 spark 上创建 table?
我执行以下操作
import findspark
findspark.init()
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
sqlCtx = SQLContext(sc)
spark_df = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data/documents_topics.csv")
spark_df.registerTempTable("my_table")
sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table")
但我得到了错误
/Users/user/anaconda/bin/python /Users/user/workspace/Outbrain-Click-Prediction/test.py Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 17/01/21 17:19:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Traceback (most recent call last): File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o19.sql. : org.apache.spark.sql.AnalysisException: unresolved operator 'CreateHiveTableAsSelectLogicalPlan CatalogTable( Table:
my_table_2
Created: Sat Jan 21 17:19:53 EST 2017 Last Access: Wed Dec 31 18:59:59 EST 1969 Type: MANAGED Storage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false;; 'CreateHiveTableAsSelectLogicalPlan CatalogTable( Table:my_table_2
Created: Sat Jan 21 17:19:53 EST 2017 Last Access: Wed Dec 31 18:59:59 EST 1969 Type: MANAGED Storage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false : +- Project [document_id#0, topic_id#1, confidence_level#2] : +- SubqueryAlias my_table : +- Relation[document_id#0,topic_id#1,confidence_level#2] csvat org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:374) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/Users/user/workspace/Outbrain-Click-Prediction/test.py", line 16, in sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table") File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/context.py", line 360, in sql return self.sparkSession.sql(sqlQuery) File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/session.py", line 543, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in call File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: "unresolved operator 'CreateHiveTableAsSelectLogicalPlan CatalogTable(\n\tTable:
my_table_2
\n\tCreated: Sat Jan 21 17:19:53 EST 2017\n\tLast Access: Wed Dec 31 18:59:59 EST 1969\n\tType: MANAGED\n\tStorage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false;;\n'CreateHiveTableAsSelectLogicalPlan CatalogTable(\n\tTable:my_table_2
\n\tCreated: Sat Jan 21 17:19:53 EST 2017\n\tLast Access: Wed Dec 31 18:59:59 EST 1969\n\tType: MANAGED\n\tStorage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false\n: +- Project [document_id#0, topic_id#1, confidence_level#2]\n: +- SubqueryAlias my_table\n: +- Relation[document_id#0,topic_id#1,confidence_level#2] csv\n"
您应该首先执行 select 并将其分配给数据帧变量,然后像处理从 CSV 文件创建的数据帧一样使用 registerTempTable
注册它
我已经通过使用 HiveContext
而不是 SQLContext
更正了这个问题,如下所示:
import findspark
findspark.init()
import pyspark
from pyspark.sql import HiveContext
sqlCtx= HiveContext(sc)
spark_df = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data/documents_topics.csv")
spark_df.registerTempTable("my_table")
sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table")