Apache Phoenix for Spark 无法正常工作

Apache Phoenix for Spark not working

根据 Phoenix 网站上的 "Load as a DataFrame using the Data Source API" 示例,我无法通过 Spark (2.1.0) 连接到 Phoenix (4.10)。我正在使用 lastet (Phoenix 4.10) 和 Hbase 1.2.5。我可以通过 Phoenix(sqlline 客户端)在 Hbase 中创建一个 table。 Spark内部返回错误如下:

scala> val df = sqlContext.load("org.apache.phoenix.spark",Map("table" -> "test", "zkUrl" -> "localhost:2181"))

warning: there was one deprecation warning; re-run with -deprecation for details
java.sql.SQLException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hbase.TableExistsException): SYSTEM.MUTEX
at org.apache.phoenix.query.ConnectionQueryServicesImpl.call(ConnectionQueryServicesImpl.java:2465)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.call(ConnectionQueryServicesImpl.java:2382)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:76)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:2382)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:255)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:149)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:221)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:98)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(ConnectionUtil.java:57)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getInputConnection(ConnectionUtil.java:45)
at org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getSelectColumnMetadataList(PhoenixConfigurationUtil.java:292)
at org.apache.phoenix.spark.PhoenixRDD.toDataFrame(PhoenixRDD.scala:118)
at org.apache.phoenix.spark.PhoenixRelation.schema(PhoenixRelation.scala:60)
at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:40)
at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:389)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:965)
... 50 elided
Caused by: org.apache.hadoop.ipc.RemoteException: SYSTEM.MUTEX
at org.apache.hadoop.hbase.master.procedure.CreateTableProcedure.prepareCreate(CreateTableProcedure.java:285)
at org.apache.hadoop.hbase.master.procedure.CreateTableProcedure.executeFromState(CreateTableProcedure.java:106)
at org.apache.hadoop.hbase.master.procedure.CreateTableProcedure.executeFromState(CreateTableProcedure.java:58)
at org.apache.hadoop.hbase.procedure2.StateMachineProcedure.execute(StateMachineProcedure.java:119)
at org.apache.hadoop.hbase.procedure2.Procedure.doExecute(Procedure.java:498)
at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.execProcedure(ProcedureExecutor.java:1147)
at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.execLoop(ProcedureExecutor.java:942)
at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.execLoop(ProcedureExecutor.java:895)
at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.access0(ProcedureExecutor.java:77)
at org.apache.hadoop.hbase.procedure2.ProcedureExecutor.run(ProcedureExecutor.java:497)

更新 1:如果 SYSTEM.MUTEX table 通过 HBase 删除,它工作正常。

更新 2:删除 SYSTEM.MUTEX table 后,只要通过 sqlContext.load(),这意味着加载另一个 table 的那一刻,或者即使重新加载相同的 table,也会抛出相同的异常,因为它试图重新创建 SYSTEM.MUTEX table.

更新 3:似乎如果你开始时没有 SYSTEM.MUTEX table 在 Hbase 中,它适用于同一个 Spark 会话,即你可以连接到任意数量的 tables,但是,如果另一个 Spark 会话被初始化,则第二个 Spark 上下文会抛出相同的异常。

根据 https://issues.apache.org/jira/browse/PHOENIX-3814 的建议(在 Spark class 路径中包含 hbase-client jar),它仍然出现相同的异常。

更新 4:我最终对 Phoenix 项目进行了自定义构建。解决方法是更改​​行号。 class org.apache.phoenix.query.ConnectionQueryServicesImpl (phoenix-core) 中的 2427 到 if (!admin.tableExists(SYSTEM_MUTEX_NAME_BYTES)) createSysMutexTable(admin); 。此外,在 https://phoenix.apache.org/phoenix_spark.html 处给出的数据帧加载示例不正确,因为它基于数据帧 class 的 deprecated/removed 保存方法,而是需要使用写入方法。请参阅以下示例:

./bin/spark-shell --master local[4] --deploy-mode client --jars path_to_to/phoenix-4.10.1-HBase-1.2-SNAPSHOT-client.jar
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._
import org.apache.spark.sql.SaveMode

val sqlContext = new SQLContext(sc)

val df = sqlContext.load("org.apache.phoenix.spark",Map("table" -> "name_of_input_table_in_phoenix", "zkUrl" -> "localhost:2181"))
df.write.format("org.apache.phoenix.spark").mode(SaveMode.Overwrite).options(Map("table" -> "name_of_output_table_in_phoenix","zkUrl" -> "localhost:2181")).save()

请注意,输出 table 应该已经存在于具有正确架构的 Phoenix 中。请注意,我使用的是自定义构建,因此在客户端 jar 名称中使用了 SNAPSHOT。

当前的 4.10 版本似乎有这个错误,因此在初始化时(当在 SQLContext 上调用加载时),Phoenix 客户端尝试创建 SYSTEM.MUTEX table([= 中的 createSysMutexTable 方法) 15=](凤凰核心)class)。但是,如果这个 table 已经存在,Hbase 会抛出 TableExistsException。虽然 createSysMutexTable 方法捕获了 TableAlreadyExists execption,但这与 Hbase 抛出的异常不同,Hbase 的异常被包装了。结果发生未处理的异常。解决方案是更新代码,并且仅在 Mutex table 不存在时才调用 createSysMutexTable 方法。有关完整的解决方案和示例代码,请参阅 UPDATE 4.