通过 spark-notebook 填充 accumulo 1.6 突变对象时出现奇怪的错误

odd error when populating accumulo 1.6 mutation object via spark-notebook

使用 spark-notebook 更新 accumulo table。采用 accumulo documentation and the accumulo example code 中指定的方法。以下是我逐字记录在笔记本上的内容,以及回复:

val clientRqrdTble = new ClientOnRequiredTable
val bwConfig = new BatchWriterConfig
val batchWriter = connector.createBatchWriter("batchtestY", bwConfig);

clientRqrdTble: org.apache.accumulo.core.cli.ClientOnRequiredTable = org.apache.accumulo.core.cli.ClientOnRequiredTable@6c6a18ed bwConfig: org.apache.accumulo.core.client.BatchWriterConfig = [maxMemory=52428800, maxLatency=120000, maxWriteThreads=3, timeout=9223372036854775807] batchWriter: org.apache.accumulo.core.client.BatchWriter = org.apache.accumulo.core.client.impl.BatchWriterImpl@298aa736

val rowIdS = rddX2_first._1.split(" ")(0)

rowIdS: String = row_0736460000

val mutation = new Mutation(new Text(rowIdS))

mutation: org.apache.accumulo.core.data.Mutation = org.apache.accumulo.core.data.Mutation@0

mutation.put(
  new Text("foo"), 
  new Text("1"), 
  new ColumnVisibility("exampleVis"), 
  new Value(new String("CHEWBACCA!").getBytes) )

java.lang.IllegalStateException: Can not add to mutation after serializing it at org.apache.accumulo.core.data.Mutation.put(Mutation.java:168) at org.apache.accumulo.core.data.Mutation.put(Mutation.java:163) at org.apache.accumulo.core.data.Mutation.put(Mutation.java:211)

我深入研究 the code,发现罪魁祸首是一个 if-catch,它正在检查 UnsynchronizedBuffer.Writer 缓冲区是否为空。行号不会对齐,因为这是一个与 1.6 accumulo-core jar 中的版本略有不同的版本——我已经查看了两者,但在这种情况下,差异并不大。据我所知,该对象是在执行该方法之前创建的,并且没有被转储。

所以要么我在代码中遗漏了某些东西,要么出现了其他问题。你们中有人知道是什么导致了这种行为吗?

更新一个

我使用 Scala 控制台并直接通过 java 1.8 执行了以下代码。它在 scala 中失败,但在 Java 中没有。我认为此时这是一个 Accumulo 问题。因此,我将打开一个错误票并更深入地挖掘源代码。如果我想出一个解决方案,我会 post 在这里。

下面是Java形式的代码。那里有一些额外的东西,因为我想确保我可以连接到我使用 accumulo 批处理编写器示例创建的 table:

import java.util.Map.Entry;

import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.client.*;
import org.apache.accumulo.core.client.mapred.*;
import org.apache.accumulo.core.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.cli.ClientOnRequiredTable.*;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.conf.Configured.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;

public class App {

    public static void main( String[] args ) throws 
                                            AccumuloException, 
                                            AccumuloSecurityException, 
                                            TableNotFoundException {
        // connect to accumulo using a scanner
        // print first ten rows of a given table
        String instanceNameS = "accumulo";
        String zooServersS = "localhost:2181";
        Instance instance = new ZooKeeperInstance(instanceNameS, zooServersS);
        Connector connector = 
                instance.getConnector( "root", new PasswordToken("password"));

        Authorizations auths = new Authorizations("exampleVis");
        Scanner scanner = connector.createScanner("batchtestY", auths);

        scanner.setRange(new Range("row_0000000001", "row_0000000010"));

        for(Entry<Key, Value> entry : scanner) {
          System.out.println(entry.getKey() + " is " + entry.getValue());
        }


        // stage up connection info objects for serialization
        ClientOnRequiredTable clientRqrdTble = new ClientOnRequiredTable();
        BatchWriterConfig bwConfig = new BatchWriterConfig();
        BatchWriter batchWriter = 
                connector.createBatchWriter("batchtestY", bwConfig);

        // create mutation object
        Mutation mutation = new Mutation(new Text("row_0000000001"));

        // populate mutation object
        // -->THIS IS WHAT'S FAILING IN SCALA<--
        mutation.put(
                  new Text("foo"), 
                  new Text("1"), 
                  new ColumnVisibility("exampleVis"), 
                  new Value(new String("CHEWBACCA!").getBytes()) );                                           
    }
}

更新两个

已针对此问题创建 Accumulo bug ticket。他们的目标是在 v1.7.0 中修复此问题。在那之前,我在下面提供的解决方案是一个功能性的解决方法。

看起来当执行新的 Mutation 单元格时 spark-notebook 中发生的任何事情都在序列化 Mutation。序列化后,您不能调用 Mutation 上的 put 。我会尝试将 mutation.put 调用添加到与新 Mutation 命令相同的笔记本单元格。看起来 clientRqrdTble/bwConfig/batchWriter 命令在一个多行单元格中,所以希望这也适用于 Mutation。

所以似乎与 java 完美配合的代码在 Scala 中表现不佳。解决方案(不一定是一个好的解决方案,但一个有效的解决方案)是在一个独立的 jar 中创建一个 java 方法来创建变异对象并 returns 它。通过这种方式,您可以将 jar 添加到 spark 的类路径并调用所需的方法 ass。使用 spark notebook 进行测试并成功更新现有的 accumulo table。我仍然会向 accumulo peeps 提交一张票,因为不应考虑这种解决方法 'best practice'。