如何在 spark-notebook 中从 Accumulo 1.6 创建 Spark RDD?

How do I create a Spark RDD from Accumulo 1.6 in spark-notebook?

我有一张包含 Spark Notebook、Spark、Accumulo 1.6 和 Hadoop 的 Vagrant 映像 运行。从笔记本中,我可以手动创建一个扫描仪并从我使用 Accumulo 示例之一创建的 table 中提取测试数据:

val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector( "root", new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)

scanner.setRange(new Range("row_0000000000", "row_0000000010"))

for(entry: Entry[Key, Value] <- scanner) {
  println(entry.getKey + " is " + entry.getValue)
}

将给出前十行 table 数据。

当我尝试这样创建 RDD 时:

val rdd2 = 
  sparkContext.newAPIHadoopRDD (
    new Configuration(), 
    classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
    classOf[org.apache.accumulo.core.data.Key], 
    classOf[org.apache.accumulo.core.data.Value]
  )

我收到一个 RDD,由于以下错误,我无法做太多事情:

java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at org.apache.spark.rdd.RDD.count(RDD.scala:927)

考虑到我没有指定任何参数来连接table、授权是什么等,这完全是有道理的。

所以我的问题是:从这里我需要做什么才能将 table 数据的前十行放入我的 RDD?

更新一个 仍然不起作用,但我确实发现了一些事情。原来有两个几乎相同的包,

org.apache.accumulo.core.client.mapreduce

&

org.apache.accumulo.core.client.mapred

除了一些方法签名不同之外,两者都有几乎相同的成员。不确定为什么两者都存在,因为我看不到弃用通知。我试图不高兴地实施 Sietse 的回答。以下是我所做的以及回复:

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)

import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.conf.Configuration jobConf: org.apache.hadoop.mapred.JobConf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml

Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml

AbstractInputFormat.setConnectorInfo(jobConf, 
                                     "root", 
                                     new PasswordToken("password")

AbstractInputFormat.setScanAuthorizations(jobConf, auths)

AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)

val rdd2 = 
  sparkContext.hadoopRDD (
    jobConf, 
    classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat], 
    classOf[org.apache.accumulo.core.data.Key], 
    classOf[org.apache.accumulo.core.data.Value], 
    1
  )

rdd2: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = HadoopRDD[1] at hadoopRDD at :62

rdd2.first

java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308) at org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.RDD.take(RDD.scala:1077) at org.apache.spark.rdd.RDD.first(RDD.scala:1110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69) at...

* 编辑 2 *

回复:霍尔顿的回答 - 仍然不开心:

    AbstractInputFormat.setConnectorInfo(jobConf, 
                                         "root", 
                                         new PasswordToken("password")
    AbstractInputFormat.setScanAuthorizations(jobConf, auths)
    AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
    InputFormatBase.setInputTableName(jobConf, "batchtest1")
    val rddX = sparkContext.newAPIHadoopRDD(
      jobConf, 
      classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
      classOf[org.apache.accumulo.core.data.Key], 
      classOf[org.apache.accumulo.core.data.Value]
      )

rddX: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = NewHadoopRDD[0] at newAPIHadoopRDD at :58

Out[15]: NewHadoopRDD[0] at newAPIHadoopRDD at :58

rddX.first

java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.RDD.take(RDD.scala:1077) at org.apache.spark.rdd.RDD.first(RDD.scala:1110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61) at

编辑 3 -- 进步!

我能够弄清楚 'input INFO not set' 错误发生的原因。眼尖的人无疑会看到以下代码缺少结束符 '('

AbstractInputFormat.setConnectorInfo(jobConf, "root", new PasswordToken("password") 

当我在 spark-notebook 中执行此操作时,我一直在单击执行按钮并继续,因为我没有看到错误。我忘记的是笔记本将执行 spark-shell 在您停止关闭 ')' 时将执行的操作 -- 它将永远等待您添加它 .所以错误是 'setConnectorInfo' 方法从未被执行的结果。

不幸的是,我仍然无法将累积 table 数据推送到对我可用的 RDD 中。当我执行

rddX.count

我回来了

res15: Long = 10000

这是正确的回答 - 我指向的 table 中有 10,000 行数据。但是,当我尝试这样获取数据的第一个元素时:

rddX.first

我收到以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.accumulo.core.data.Key

关于从这里去哪里有什么想法吗?

编辑 4 -- 成功!

已接受的答案 + 评论已完成 90% - 除了需要将累积 key/value 转换为可序列化的内容这一事实。我通过在两者上调用 .toString() 方法来完成这项工作。我会尽快尝试 post 一些完整的工作代码,以防其他人遇到同样的问题。

看起来这些参数必须通过静态方法设置:http://accumulo.apache.org/1.6/apidocs/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.html。因此,请尝试再次设置非可选参数和 运行。它应该工作。

通常使用自定义 Hadoop InputFormats,信息是使用 JobConf 指定的。正如@Sietse 所指出的,AccumuloInputFormat 上有一些静态方法可用于配置 JobConf。在这种情况下,我想你想要做的是:

val jobConf = new JobConf() // Create a job conf
// Configure the job conf with our accumulo properties
AccumuloInputFormat.setConnectorInfo(jobConf, principal, token)
AccumuloInputFormat.setScanAuthorizations(jobConf, authorizations)
val clientConfig =  new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)
AccumuloInputFormat.setZooKeeperInstance(jobConf, clientConfig)
AccumuloInputFormat.setInputTableName(jobConf, tableName)
// Create an RDD using the jobConf
val rdd2 = sc.newAPIHadoopRDD(jobConf, 
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
classOf[org.apache.accumulo.core.data.Key], 
classOf[org.apache.accumulo.core.data.Value]
)

注意:在深入研究代码后,似乎 属性 的配置部分是基于调用的 class 设置的(避免与其他包潜在冲突是有意义的) ,所以当我们去把它取回具体 class 之后它找不到 is configured 标志。解决方案是不使用 Abstract classes。有关实施细节,请参阅 https://github.com/apache/accumulo/blob/bf102d0711103e903afa0589500f5796ad51c366/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java#L127)。如果您不能在 spark-notebook 的具体实现上调用此方法,可能使用 spark-shell 或定期构建的应用程序是最简单的解决方案。