Spark 1.3.1 ClassNotFoundException 上的 Apache Phoenix(4.3.1 和 4.4.0-HBase-0.98)
Apache Phoenix (4.3.1 and 4.4.0-HBase-0.98) on Spark 1.3.1 ClassNotFoundException
我正在尝试通过 Spark 连接到 Phoenix,但在通过 JDBC 驱动程序打开连接时,我不断收到以下异常(为简洁起见,下面是完整的堆栈跟踪):
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
有问题的 class 由名为 phoenix-core-4.3 的 jar 提供。1.jar(尽管它位于 HBase 包命名空间中,但我猜他们需要它来与 HBase 集成) .
在 SO 上有很多关于 ClassNotFoundExceptions on Spark 的问题,我尝试了 fat-jar 方法(使用 Maven 的程序集和阴影插件;我检查了罐子,它们 do 包含 ClientRpcControllerFactory),我在命令行上指定 jar 时尝试了一个精简的 jar。对于后者,我使用的命令如下:
/opt/mapr/spark/spark-1.3.1/bin/spark-submit --jars lib/spark-streaming-kafka_10-1.3.1.jar,lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,lib/metrics-core-3.1.0.jar,lib/metrics-core-2.2.0.jar,lib/phoenix-core-4.3.1.jar --class nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector KafkaStreamConsumer.jar node1:5181 0 topic jdbc:phoenix:node1:5181 true
我还从代码中完成了 class 路径转储,层次结构中的第一个 classloader 已经知道 Phoenix jar:
2015-06-04 10:52:34,323 [Executor task launch worker-1] INFO nl.work.kafkastreamconsumer.phoenix.LinePersister - [file:/home/work/projects/customer/KafkaStreamConsumer.jar, file:/home/work/projects/customer/lib/spark-streaming-kafka_2.10-1.3.1.jar, file:/home/work/projects/customer/lib/kafka_2.10-0.8.1.1.jar, file:/home/work/projects/customer/lib/zkclient-0.3.jar, file:/home/work/projects/customer/lib/metrics-core-3.1.0.jar, file:/home/work/projects/customer/lib/metrics-core-2.2.0.jar, file:/home/work/projects/customer/lib/phoenix-core-4.3.1.jar]
所以问题是:我在这里缺少什么?为什么 Spark 不能加载正确的 class?应该只有一个版本的 class 飞来飞去(即来自 phoenix-core 的那个),所以我怀疑这是版本冲突。
[Executor task launch worker-3] ERROR nl.work.kafkastreamconsumer.phoenix.LinePersister - Error while processing line
java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:41)
at nl.work.kafkastreamconsumer.phoenix.LinePersister.call(LinePersister.java:40)
at nl.work.kafkastreamconsumer.phoenix.LinePersister.call(LinePersister.java:32)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction.apply(JavaPairRDD.scala:999)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
at org.apache.phoenix.exception.SQLExceptionCode$Factory.newException(SQLExceptionCode.java:362)
at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:133)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:282)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.access0(ConnectionQueryServicesImpl.java:166)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.call(ConnectionQueryServicesImpl.java:1831)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.call(ConnectionQueryServicesImpl.java:1810)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1810)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:126)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
at java.sql.DriverManager.getConnection(DriverManager.java:571)
at java.sql.DriverManager.getConnection(DriverManager.java:233)
at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:39)
... 25 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457)
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350)
at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:280)
... 36 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedConstructorAccessor8.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455)
... 39 more
Caused by: java.lang.UnsupportedOperationException: Unable to find org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36)
at org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:769)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:689)
... 43 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32)
... 46 more
/编辑
不幸的是,问题仍然存在于 4.4.0-HBase-0.98 中。以下是有问题的 classes。由于 saveToPhoenix() 方法尚不可用于 Java API,并且这只是一个 POC,我的想法是简单地为每个小批量使用 JDBC 驱动程序。
public class PhoenixConnection implements AutoCloseable, Serializable {
private static final long serialVersionUID = -4491057264383873689L;
private static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
static {
try {
Class.forName(PHOENIX_DRIVER);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
private Connection connection;
public PhoenixConnection(final String jdbcUri) {
try {
connection = DriverManager.getConnection(jdbcUri);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public List<Map<String, Object>> executeQuery(final String sql) throws SQLException {
ArrayList<Map<String, Object>> resultList = new ArrayList<>();
try (PreparedStatement statement = connection.prepareStatement(sql); ResultSet resultSet = statement.executeQuery() ) {
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
Map<String, Object> row = new HashMap<>(metaData.getColumnCount());
for (int column = 0; column < metaData.getColumnCount(); ++column) {
final String columnLabel = metaData.getColumnLabel(column);
row.put(columnLabel, resultSet.getObject(columnLabel));
}
}
}
resultList.trimToSize();
return resultList;
}
@Override
public void close() {
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
public class LinePersister implements Function<JavaRDD<String>, Void> {
private static final long serialVersionUID = -2529724617108874989L;
private static final Logger LOGGER = Logger.getLogger(LinePersister.class);
private static final String TABLE_NAME = "mail_events";
private final String jdbcUrl;
public LinePersister(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
}
@Override
public Void call(JavaRDD<String> dataSet) throws Exception {
LOGGER.info(String.format(
"Starting conversion on rdd with %d elements", dataSet.count()));
List<Void> collectResult = dataSet.map(new Function<String, Void>() {
private static final long serialVersionUID = -6651313541439109868L;
@Override
public Void call(String line) throws Exception {
LOGGER.info("Writing line " + line);
Event event = EventParser.parseLine(line);
try (PhoenixConnection connection = new PhoenixConnection(
jdbcUrl)) {
connection.executeQuery(event
.createUpsertStatement(TABLE_NAME));
} catch (Exception e) {
LOGGER.error("Error while processing line", e);
dumpClasspath(this.getClass().getClassLoader());
}
return null;
}
}).collect();
LOGGER.info(String.format("Got %d results: ", collectResult.size()));
return null;
}
public static void dumpClasspath(ClassLoader loader)
{
LOGGER.info("Classloader " + loader + ":");
if (loader instanceof URLClassLoader)
{
URLClassLoader ucl = (URLClassLoader)loader;
LOGGER.info(Arrays.toString(ucl.getURLs()));
}
else
LOGGER.error("cannot display components as not a URLClassLoader)");
if (loader.getParent() != null)
dumpClasspath(loader.getParent());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>nl.work</groupId>
<artifactId>KafkaStreamConsumer</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<spark.version>1.3.1</spark.version>
<hibernate.version>4.3.10.Final</hibernate.version>
<phoenix.version>4.4.0-HBase-0.98</phoenix.version>
<hbase.version>0.98.9-hadoop2</hbase.version>
<spark-hbase.version>0.0.2-clabs-spark-1.3.1</spark-hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>${phoenix.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>${phoenix.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.cloudera</groupId>
<artifactId>spark-hbase</artifactId>
<version>${spark-hbase.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId>
<version>2.3</version> <executions> <execution> <phase>package</phase> <goals>
<goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact>
<excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration>
</execution> </executions> </plugin> -->
</plugins>
</build>
<repositories>
<repository>
<id>unknown-jars-temp-repo</id>
<name>A temporary repository created by NetBeans for libraries and jars it could not identify. Please replace the dependencies in this repository with correct ones and delete this repository.</name>
<url>file:${project.basedir}/lib</url>
</repository>
</repositories>
</project>
/edit2
我已经尝试过 saveAsHadoopApiFile 方法 (https://gist.github.com/mravi/444afe7f49821819c987#file-phoenixsparkjob-java) 但它会产生相同的错误,只是堆栈跟踪不同:
java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:58)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:995)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:979)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
at org.apache.phoenix.exception.SQLExceptionCode$Factory.newException(SQLExceptionCode.java:386)
at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:288)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.access0(ConnectionQueryServicesImpl.java:171)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.call(ConnectionQueryServicesImpl.java:1881)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.call(ConnectionQueryServicesImpl.java:1860)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1860)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:131)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
at java.sql.DriverManager.getConnection(DriverManager.java:571)
at java.sql.DriverManager.getConnection(DriverManager.java:187)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:92)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:80)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:68)
at org.apache.phoenix.mapreduce.PhoenixRecordWriter.<init>(PhoenixRecordWriter.java:49)
at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:55)
... 8 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457)
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350)
at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:286)
... 23 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455)
... 26 more
Caused by: java.lang.UnsupportedOperationException: Unable to find org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36)
at org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:769)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:689)
... 31 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32)
... 34 more
您不能在 4.3.1 版本的 Apache Phoenix 上使用 Spark。仅从 4.4.0 版开始支持 Spark。 Jars 现已添加到 Maven 存储库(源代码于 5 月 29 日发布。
所以如果你更新到 4.4.0 版本的 Phoenix 你应该可以访问。如果您使用的是 CDH,则客户端 jar 尚未发布。
谢谢
PS:我为此提出了 JIRA,响应来自 Phoenix
的一位提交者
Phoenix 邮件列表的好心人给了我答案:
“除了将 Phoenix 客户端 JAR 与您的应用程序捆绑在一起之外,您是否能够
将其包含在 SPARK_CLASSPATH 中的静态位置,或设置
下面的 conf 值(我自己使用 SPARK_CLASSPATH,尽管它已被弃用):
spark.driver.extraClassPath
spark.executor.extraClassPath
"
https://www.mail-archive.com/user@spark.apache.org/msg29978.html
我遇到了同样的问题。问题的原因,Phoenix 使用自定义 rpc 控制器工厂,这是 Phoenix 特定的工厂来配置集群端索引和系统目录 table 的优先级。它称为 ClientRpcControllerFactory。
有时从纯 HBase 客户端应用程序使用支持 Phoenix 的集群,导致应用程序代码或 MapReduce 作业中出现 ClassNotFoundExceptions。由于 hbase 配置在 Phoenix 客户端和 HBase 客户端之间共享,因此很难在客户端进行不同的配置。这就是为什么你得到这个例外。此问题已由 HBASE-14960 修复。如果您的 hbase 版本早于 2.0.0、1.2.0、1.3.0、0.98.17 您可以使用 hbase-site.xml:
中的此设置定义客户端 rpc 控制器
<property>
<name>hbase.rpc.controllerfactory.class</name>
<value>org.apache.hadoop.hbase.ipc.RpcControllerFactory</value>
</property>
我正在尝试通过 Spark 连接到 Phoenix,但在通过 JDBC 驱动程序打开连接时,我不断收到以下异常(为简洁起见,下面是完整的堆栈跟踪):
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
有问题的 class 由名为 phoenix-core-4.3 的 jar 提供。1.jar(尽管它位于 HBase 包命名空间中,但我猜他们需要它来与 HBase 集成) .
在 SO 上有很多关于 ClassNotFoundExceptions on Spark 的问题,我尝试了 fat-jar 方法(使用 Maven 的程序集和阴影插件;我检查了罐子,它们 do 包含 ClientRpcControllerFactory),我在命令行上指定 jar 时尝试了一个精简的 jar。对于后者,我使用的命令如下:
/opt/mapr/spark/spark-1.3.1/bin/spark-submit --jars lib/spark-streaming-kafka_10-1.3.1.jar,lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,lib/metrics-core-3.1.0.jar,lib/metrics-core-2.2.0.jar,lib/phoenix-core-4.3.1.jar --class nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector KafkaStreamConsumer.jar node1:5181 0 topic jdbc:phoenix:node1:5181 true
我还从代码中完成了 class 路径转储,层次结构中的第一个 classloader 已经知道 Phoenix jar:
2015-06-04 10:52:34,323 [Executor task launch worker-1] INFO nl.work.kafkastreamconsumer.phoenix.LinePersister - [file:/home/work/projects/customer/KafkaStreamConsumer.jar, file:/home/work/projects/customer/lib/spark-streaming-kafka_2.10-1.3.1.jar, file:/home/work/projects/customer/lib/kafka_2.10-0.8.1.1.jar, file:/home/work/projects/customer/lib/zkclient-0.3.jar, file:/home/work/projects/customer/lib/metrics-core-3.1.0.jar, file:/home/work/projects/customer/lib/metrics-core-2.2.0.jar, file:/home/work/projects/customer/lib/phoenix-core-4.3.1.jar]
所以问题是:我在这里缺少什么?为什么 Spark 不能加载正确的 class?应该只有一个版本的 class 飞来飞去(即来自 phoenix-core 的那个),所以我怀疑这是版本冲突。
[Executor task launch worker-3] ERROR nl.work.kafkastreamconsumer.phoenix.LinePersister - Error while processing line
java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:41)
at nl.work.kafkastreamconsumer.phoenix.LinePersister.call(LinePersister.java:40)
at nl.work.kafkastreamconsumer.phoenix.LinePersister.call(LinePersister.java:32)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction.apply(JavaPairRDD.scala:999)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:813)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
at org.apache.phoenix.exception.SQLExceptionCode$Factory.newException(SQLExceptionCode.java:362)
at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:133)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:282)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.access0(ConnectionQueryServicesImpl.java:166)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.call(ConnectionQueryServicesImpl.java:1831)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.call(ConnectionQueryServicesImpl.java:1810)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1810)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:126)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
at java.sql.DriverManager.getConnection(DriverManager.java:571)
at java.sql.DriverManager.getConnection(DriverManager.java:233)
at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:39)
... 25 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457)
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350)
at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:280)
... 36 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedConstructorAccessor8.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455)
... 39 more
Caused by: java.lang.UnsupportedOperationException: Unable to find org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36)
at org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:769)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:689)
... 43 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32)
... 46 more
/编辑
不幸的是,问题仍然存在于 4.4.0-HBase-0.98 中。以下是有问题的 classes。由于 saveToPhoenix() 方法尚不可用于 Java API,并且这只是一个 POC,我的想法是简单地为每个小批量使用 JDBC 驱动程序。
public class PhoenixConnection implements AutoCloseable, Serializable {
private static final long serialVersionUID = -4491057264383873689L;
private static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
static {
try {
Class.forName(PHOENIX_DRIVER);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
private Connection connection;
public PhoenixConnection(final String jdbcUri) {
try {
connection = DriverManager.getConnection(jdbcUri);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public List<Map<String, Object>> executeQuery(final String sql) throws SQLException {
ArrayList<Map<String, Object>> resultList = new ArrayList<>();
try (PreparedStatement statement = connection.prepareStatement(sql); ResultSet resultSet = statement.executeQuery() ) {
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
Map<String, Object> row = new HashMap<>(metaData.getColumnCount());
for (int column = 0; column < metaData.getColumnCount(); ++column) {
final String columnLabel = metaData.getColumnLabel(column);
row.put(columnLabel, resultSet.getObject(columnLabel));
}
}
}
resultList.trimToSize();
return resultList;
}
@Override
public void close() {
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
public class LinePersister implements Function<JavaRDD<String>, Void> {
private static final long serialVersionUID = -2529724617108874989L;
private static final Logger LOGGER = Logger.getLogger(LinePersister.class);
private static final String TABLE_NAME = "mail_events";
private final String jdbcUrl;
public LinePersister(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
}
@Override
public Void call(JavaRDD<String> dataSet) throws Exception {
LOGGER.info(String.format(
"Starting conversion on rdd with %d elements", dataSet.count()));
List<Void> collectResult = dataSet.map(new Function<String, Void>() {
private static final long serialVersionUID = -6651313541439109868L;
@Override
public Void call(String line) throws Exception {
LOGGER.info("Writing line " + line);
Event event = EventParser.parseLine(line);
try (PhoenixConnection connection = new PhoenixConnection(
jdbcUrl)) {
connection.executeQuery(event
.createUpsertStatement(TABLE_NAME));
} catch (Exception e) {
LOGGER.error("Error while processing line", e);
dumpClasspath(this.getClass().getClassLoader());
}
return null;
}
}).collect();
LOGGER.info(String.format("Got %d results: ", collectResult.size()));
return null;
}
public static void dumpClasspath(ClassLoader loader)
{
LOGGER.info("Classloader " + loader + ":");
if (loader instanceof URLClassLoader)
{
URLClassLoader ucl = (URLClassLoader)loader;
LOGGER.info(Arrays.toString(ucl.getURLs()));
}
else
LOGGER.error("cannot display components as not a URLClassLoader)");
if (loader.getParent() != null)
dumpClasspath(loader.getParent());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>nl.work</groupId>
<artifactId>KafkaStreamConsumer</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<spark.version>1.3.1</spark.version>
<hibernate.version>4.3.10.Final</hibernate.version>
<phoenix.version>4.4.0-HBase-0.98</phoenix.version>
<hbase.version>0.98.9-hadoop2</hbase.version>
<spark-hbase.version>0.0.2-clabs-spark-1.3.1</spark-hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>${phoenix.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>${phoenix.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.cloudera</groupId>
<artifactId>spark-hbase</artifactId>
<version>${spark-hbase.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId>
<version>2.3</version> <executions> <execution> <phase>package</phase> <goals>
<goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact>
<excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration>
</execution> </executions> </plugin> -->
</plugins>
</build>
<repositories>
<repository>
<id>unknown-jars-temp-repo</id>
<name>A temporary repository created by NetBeans for libraries and jars it could not identify. Please replace the dependencies in this repository with correct ones and delete this repository.</name>
<url>file:${project.basedir}/lib</url>
</repository>
</repositories>
</project>
/edit2 我已经尝试过 saveAsHadoopApiFile 方法 (https://gist.github.com/mravi/444afe7f49821819c987#file-phoenixsparkjob-java) 但它会产生相同的错误,只是堆栈跟踪不同:
java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:58)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:995)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun.apply(PairRDDFunctions.scala:979)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
at org.apache.phoenix.exception.SQLExceptionCode$Factory.newException(SQLExceptionCode.java:386)
at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:288)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.access0(ConnectionQueryServicesImpl.java:171)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.call(ConnectionQueryServicesImpl.java:1881)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.call(ConnectionQueryServicesImpl.java:1860)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1860)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:131)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
at java.sql.DriverManager.getConnection(DriverManager.java:571)
at java.sql.DriverManager.getConnection(DriverManager.java:187)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:92)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:80)
at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:68)
at org.apache.phoenix.mapreduce.PhoenixRecordWriter.<init>(PhoenixRecordWriter.java:49)
at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:55)
... 8 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457)
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350)
at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:286)
... 23 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455)
... 26 more
Caused by: java.lang.UnsupportedOperationException: Unable to find org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36)
at org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:769)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:689)
... 31 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32)
... 34 more
您不能在 4.3.1 版本的 Apache Phoenix 上使用 Spark。仅从 4.4.0 版开始支持 Spark。 Jars 现已添加到 Maven 存储库(源代码于 5 月 29 日发布。
所以如果你更新到 4.4.0 版本的 Phoenix 你应该可以访问。如果您使用的是 CDH,则客户端 jar 尚未发布。
谢谢
PS:我为此提出了 JIRA,响应来自 Phoenix
的一位提交者Phoenix 邮件列表的好心人给了我答案:
“除了将 Phoenix 客户端 JAR 与您的应用程序捆绑在一起之外,您是否能够 将其包含在 SPARK_CLASSPATH 中的静态位置,或设置 下面的 conf 值(我自己使用 SPARK_CLASSPATH,尽管它已被弃用): spark.driver.extraClassPath spark.executor.extraClassPath "
https://www.mail-archive.com/user@spark.apache.org/msg29978.html
我遇到了同样的问题。问题的原因,Phoenix 使用自定义 rpc 控制器工厂,这是 Phoenix 特定的工厂来配置集群端索引和系统目录 table 的优先级。它称为 ClientRpcControllerFactory。
有时从纯 HBase 客户端应用程序使用支持 Phoenix 的集群,导致应用程序代码或 MapReduce 作业中出现 ClassNotFoundExceptions。由于 hbase 配置在 Phoenix 客户端和 HBase 客户端之间共享,因此很难在客户端进行不同的配置。这就是为什么你得到这个例外。此问题已由 HBASE-14960 修复。如果您的 hbase 版本早于 2.0.0、1.2.0、1.3.0、0.98.17 您可以使用 hbase-site.xml:
中的此设置定义客户端 rpc 控制器 <property>
<name>hbase.rpc.controllerfactory.class</name>
<value>org.apache.hadoop.hbase.ipc.RpcControllerFactory</value>
</property>