如何在 spark-notebook 中从 Accumulo 1.6 创建 Spark RDD?
How do I create a Spark RDD from Accumulo 1.6 in spark-notebook?
我有一张包含 Spark Notebook、Spark、Accumulo 1.6 和 Hadoop 的 Vagrant 映像 运行。从笔记本中,我可以手动创建一个扫描仪并从我使用 Accumulo 示例之一创建的 table 中提取测试数据:
val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector( "root", new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)
scanner.setRange(new Range("row_0000000000", "row_0000000010"))
for(entry: Entry[Key, Value] <- scanner) {
println(entry.getKey + " is " + entry.getValue)
}
将给出前十行 table 数据。
当我尝试这样创建 RDD 时:
val rdd2 =
sparkContext.newAPIHadoopRDD (
new Configuration(),
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
我收到一个 RDD,由于以下错误,我无法做太多事情:
java.io.IOException: Input info has not been set. at
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
at
org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:222)
at
org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at
org.apache.spark.rdd.RDD.count(RDD.scala:927)
考虑到我没有指定任何参数来连接table、授权是什么等,这完全是有道理的。
所以我的问题是:从这里我需要做什么才能将 table 数据的前十行放入我的 RDD?
更新一个
仍然不起作用,但我确实发现了一些事情。原来有两个几乎相同的包,
org.apache.accumulo.core.client.mapreduce
&
org.apache.accumulo.core.client.mapred
除了一些方法签名不同之外,两者都有几乎相同的成员。不确定为什么两者都存在,因为我看不到弃用通知。我试图不高兴地实施 Sietse 的回答。以下是我所做的以及回复:
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)
import org.apache.hadoop.mapred.JobConf import
org.apache.hadoop.conf.Configuration jobConf:
org.apache.hadoop.mapred.JobConf = Configuration: core-default.xml,
core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml,
yarn-site.xml
Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
val rdd2 =
sparkContext.hadoopRDD (
jobConf,
classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value],
1
)
rdd2: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key,
org.apache.accumulo.core.data.Value)] = HadoopRDD[1] at hadoopRDD at
:62
rdd2.first
java.io.IOException: Input info has not been set. at
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
at
org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308)
at
org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at
org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:222)
at
org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
org.apache.spark.rdd.RDD.take(RDD.scala:1077) at
org.apache.spark.rdd.RDD.first(RDD.scala:1110) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69)
at...
* 编辑 2 *
回复:霍尔顿的回答 - 仍然不开心:
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
InputFormatBase.setInputTableName(jobConf, "batchtest1")
val rddX = sparkContext.newAPIHadoopRDD(
jobConf,
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
rddX: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key,
org.apache.accumulo.core.data.Value)] = NewHadoopRDD[0] at
newAPIHadoopRDD at :58
Out[15]: NewHadoopRDD[0] at newAPIHadoopRDD at :58
rddX.first
java.io.IOException: Input info has not been set. at
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)
at
org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
at
org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:222)
at
org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
org.apache.spark.rdd.RDD.take(RDD.scala:1077) at
org.apache.spark.rdd.RDD.first(RDD.scala:1110) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61)
at
编辑 3 -- 进步!
我能够弄清楚 'input INFO not set' 错误发生的原因。眼尖的人无疑会看到以下代码缺少结束符 '('
AbstractInputFormat.setConnectorInfo(jobConf, "root", new PasswordToken("password")
当我在 spark-notebook 中执行此操作时,我一直在单击执行按钮并继续,因为我没有看到错误。我忘记的是笔记本将执行 spark-shell 在您停止关闭 ')' 时将执行的操作 -- 它将永远等待您添加它 .所以错误是 'setConnectorInfo' 方法从未被执行的结果。
不幸的是,我仍然无法将累积 table 数据推送到对我可用的 RDD 中。当我执行
rddX.count
我回来了
res15: Long = 10000
这是正确的回答 - 我指向的 table 中有 10,000 行数据。但是,当我尝试这样获取数据的第一个元素时:
rddX.first
我收到以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0.0 in stage 0.0 (TID 0) had a not serializable result:
org.apache.accumulo.core.data.Key
关于从这里去哪里有什么想法吗?
编辑 4 -- 成功!
已接受的答案 + 评论已完成 90% - 除了需要将累积 key/value 转换为可序列化的内容这一事实。我通过在两者上调用 .toString() 方法来完成这项工作。我会尽快尝试 post 一些完整的工作代码,以防其他人遇到同样的问题。
看起来这些参数必须通过静态方法设置:http://accumulo.apache.org/1.6/apidocs/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.html。因此,请尝试再次设置非可选参数和 运行。它应该工作。
通常使用自定义 Hadoop InputFormats,信息是使用 JobConf 指定的。正如@Sietse 所指出的,AccumuloInputFormat 上有一些静态方法可用于配置 JobConf。在这种情况下,我想你想要做的是:
val jobConf = new JobConf() // Create a job conf
// Configure the job conf with our accumulo properties
AccumuloInputFormat.setConnectorInfo(jobConf, principal, token)
AccumuloInputFormat.setScanAuthorizations(jobConf, authorizations)
val clientConfig = new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)
AccumuloInputFormat.setZooKeeperInstance(jobConf, clientConfig)
AccumuloInputFormat.setInputTableName(jobConf, tableName)
// Create an RDD using the jobConf
val rdd2 = sc.newAPIHadoopRDD(jobConf,
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
注意:在深入研究代码后,似乎 属性 的配置部分是基于调用的 class 设置的(避免与其他包潜在冲突是有意义的) ,所以当我们去把它取回具体 class 之后它找不到 is configured 标志。解决方案是不使用 Abstract classes。有关实施细节,请参阅 https://github.com/apache/accumulo/blob/bf102d0711103e903afa0589500f5796ad51c366/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java#L127)。如果您不能在 spark-notebook 的具体实现上调用此方法,可能使用 spark-shell 或定期构建的应用程序是最简单的解决方案。
我有一张包含 Spark Notebook、Spark、Accumulo 1.6 和 Hadoop 的 Vagrant 映像 运行。从笔记本中,我可以手动创建一个扫描仪并从我使用 Accumulo 示例之一创建的 table 中提取测试数据:
val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector( "root", new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)
scanner.setRange(new Range("row_0000000000", "row_0000000010"))
for(entry: Entry[Key, Value] <- scanner) {
println(entry.getKey + " is " + entry.getValue)
}
将给出前十行 table 数据。
当我尝试这样创建 RDD 时:
val rdd2 =
sparkContext.newAPIHadoopRDD (
new Configuration(),
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
我收到一个 RDD,由于以下错误,我无法做太多事情:
java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at org.apache.spark.rdd.RDD.count(RDD.scala:927)
考虑到我没有指定任何参数来连接table、授权是什么等,这完全是有道理的。
所以我的问题是:从这里我需要做什么才能将 table 数据的前十行放入我的 RDD?
更新一个 仍然不起作用,但我确实发现了一些事情。原来有两个几乎相同的包,
org.apache.accumulo.core.client.mapreduce
&
org.apache.accumulo.core.client.mapred
除了一些方法签名不同之外,两者都有几乎相同的成员。不确定为什么两者都存在,因为我看不到弃用通知。我试图不高兴地实施 Sietse 的回答。以下是我所做的以及回复:
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)
import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.conf.Configuration jobConf: org.apache.hadoop.mapred.JobConf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
val rdd2 =
sparkContext.hadoopRDD (
jobConf,
classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value],
1
)
rdd2: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = HadoopRDD[1] at hadoopRDD at :62
rdd2.first
java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308) at org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.RDD.take(RDD.scala:1077) at org.apache.spark.rdd.RDD.first(RDD.scala:1110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69) at...
* 编辑 2 *
回复:霍尔顿的回答 - 仍然不开心:
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
InputFormatBase.setInputTableName(jobConf, "batchtest1")
val rddX = sparkContext.newAPIHadoopRDD(
jobConf,
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
rddX: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = NewHadoopRDD[0] at newAPIHadoopRDD at :58
Out[15]: NewHadoopRDD[0] at newAPIHadoopRDD at :58
rddX.first
java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.RDD.take(RDD.scala:1077) at org.apache.spark.rdd.RDD.first(RDD.scala:1110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61) at
编辑 3 -- 进步!
我能够弄清楚 'input INFO not set' 错误发生的原因。眼尖的人无疑会看到以下代码缺少结束符 '('
AbstractInputFormat.setConnectorInfo(jobConf, "root", new PasswordToken("password")
当我在 spark-notebook 中执行此操作时,我一直在单击执行按钮并继续,因为我没有看到错误。我忘记的是笔记本将执行 spark-shell 在您停止关闭 ')' 时将执行的操作 -- 它将永远等待您添加它 .所以错误是 'setConnectorInfo' 方法从未被执行的结果。
不幸的是,我仍然无法将累积 table 数据推送到对我可用的 RDD 中。当我执行
rddX.count
我回来了
res15: Long = 10000
这是正确的回答 - 我指向的 table 中有 10,000 行数据。但是,当我尝试这样获取数据的第一个元素时:
rddX.first
我收到以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.accumulo.core.data.Key
关于从这里去哪里有什么想法吗?
编辑 4 -- 成功!
已接受的答案 + 评论已完成 90% - 除了需要将累积 key/value 转换为可序列化的内容这一事实。我通过在两者上调用 .toString() 方法来完成这项工作。我会尽快尝试 post 一些完整的工作代码,以防其他人遇到同样的问题。
看起来这些参数必须通过静态方法设置:http://accumulo.apache.org/1.6/apidocs/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.html。因此,请尝试再次设置非可选参数和 运行。它应该工作。
通常使用自定义 Hadoop InputFormats,信息是使用 JobConf 指定的。正如@Sietse 所指出的,AccumuloInputFormat 上有一些静态方法可用于配置 JobConf。在这种情况下,我想你想要做的是:
val jobConf = new JobConf() // Create a job conf
// Configure the job conf with our accumulo properties
AccumuloInputFormat.setConnectorInfo(jobConf, principal, token)
AccumuloInputFormat.setScanAuthorizations(jobConf, authorizations)
val clientConfig = new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)
AccumuloInputFormat.setZooKeeperInstance(jobConf, clientConfig)
AccumuloInputFormat.setInputTableName(jobConf, tableName)
// Create an RDD using the jobConf
val rdd2 = sc.newAPIHadoopRDD(jobConf,
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
注意:在深入研究代码后,似乎 属性 的配置部分是基于调用的 class 设置的(避免与其他包潜在冲突是有意义的) ,所以当我们去把它取回具体 class 之后它找不到 is configured 标志。解决方案是不使用 Abstract classes。有关实施细节,请参阅 https://github.com/apache/accumulo/blob/bf102d0711103e903afa0589500f5796ad51c366/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java#L127)。如果您不能在 spark-notebook 的具体实现上调用此方法,可能使用 spark-shell 或定期构建的应用程序是最简单的解决方案。