通过 spark-notebook 填充 accumulo 1.6 突变对象时出现奇怪的错误
odd error when populating accumulo 1.6 mutation object via spark-notebook
使用 spark-notebook 更新 accumulo table。采用 accumulo documentation and the accumulo example code 中指定的方法。以下是我逐字记录在笔记本上的内容,以及回复:
val clientRqrdTble = new ClientOnRequiredTable
val bwConfig = new BatchWriterConfig
val batchWriter = connector.createBatchWriter("batchtestY", bwConfig);
clientRqrdTble: org.apache.accumulo.core.cli.ClientOnRequiredTable =
org.apache.accumulo.core.cli.ClientOnRequiredTable@6c6a18ed bwConfig:
org.apache.accumulo.core.client.BatchWriterConfig =
[maxMemory=52428800, maxLatency=120000, maxWriteThreads=3,
timeout=9223372036854775807] batchWriter:
org.apache.accumulo.core.client.BatchWriter =
org.apache.accumulo.core.client.impl.BatchWriterImpl@298aa736
val rowIdS = rddX2_first._1.split(" ")(0)
rowIdS: String = row_0736460000
val mutation = new Mutation(new Text(rowIdS))
mutation: org.apache.accumulo.core.data.Mutation =
org.apache.accumulo.core.data.Mutation@0
mutation.put(
new Text("foo"),
new Text("1"),
new ColumnVisibility("exampleVis"),
new Value(new String("CHEWBACCA!").getBytes) )
java.lang.IllegalStateException: Can not add to mutation after
serializing it at
org.apache.accumulo.core.data.Mutation.put(Mutation.java:168) at
org.apache.accumulo.core.data.Mutation.put(Mutation.java:163) at
org.apache.accumulo.core.data.Mutation.put(Mutation.java:211)
我深入研究 the code,发现罪魁祸首是一个 if-catch,它正在检查 UnsynchronizedBuffer.Writer 缓冲区是否为空。行号不会对齐,因为这是一个与 1.6 accumulo-core jar 中的版本略有不同的版本——我已经查看了两者,但在这种情况下,差异并不大。据我所知,该对象是在执行该方法之前创建的,并且没有被转储。
所以要么我在代码中遗漏了某些东西,要么出现了其他问题。你们中有人知道是什么导致了这种行为吗?
更新一个
我使用 Scala 控制台并直接通过 java 1.8 执行了以下代码。它在 scala 中失败,但在 Java 中没有。我认为此时这是一个 Accumulo 问题。因此,我将打开一个错误票并更深入地挖掘源代码。如果我想出一个解决方案,我会 post 在这里。
下面是Java形式的代码。那里有一些额外的东西,因为我想确保我可以连接到我使用 accumulo 批处理编写器示例创建的 table:
import java.util.Map.Entry;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.client.*;
import org.apache.accumulo.core.client.mapred.*;
import org.apache.accumulo.core.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.cli.ClientOnRequiredTable.*;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.conf.Configured.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;
public class App {
public static void main( String[] args ) throws
AccumuloException,
AccumuloSecurityException,
TableNotFoundException {
// connect to accumulo using a scanner
// print first ten rows of a given table
String instanceNameS = "accumulo";
String zooServersS = "localhost:2181";
Instance instance = new ZooKeeperInstance(instanceNameS, zooServersS);
Connector connector =
instance.getConnector( "root", new PasswordToken("password"));
Authorizations auths = new Authorizations("exampleVis");
Scanner scanner = connector.createScanner("batchtestY", auths);
scanner.setRange(new Range("row_0000000001", "row_0000000010"));
for(Entry<Key, Value> entry : scanner) {
System.out.println(entry.getKey() + " is " + entry.getValue());
}
// stage up connection info objects for serialization
ClientOnRequiredTable clientRqrdTble = new ClientOnRequiredTable();
BatchWriterConfig bwConfig = new BatchWriterConfig();
BatchWriter batchWriter =
connector.createBatchWriter("batchtestY", bwConfig);
// create mutation object
Mutation mutation = new Mutation(new Text("row_0000000001"));
// populate mutation object
// -->THIS IS WHAT'S FAILING IN SCALA<--
mutation.put(
new Text("foo"),
new Text("1"),
new ColumnVisibility("exampleVis"),
new Value(new String("CHEWBACCA!").getBytes()) );
}
}
更新两个
已针对此问题创建 Accumulo bug ticket。他们的目标是在 v1.7.0 中修复此问题。在那之前,我在下面提供的解决方案是一个功能性的解决方法。
看起来当执行新的 Mutation 单元格时 spark-notebook 中发生的任何事情都在序列化 Mutation。序列化后,您不能调用 Mutation 上的 put 。我会尝试将 mutation.put 调用添加到与新 Mutation 命令相同的笔记本单元格。看起来 clientRqrdTble/bwConfig/batchWriter 命令在一个多行单元格中,所以希望这也适用于 Mutation。
所以似乎与 java 完美配合的代码在 Scala 中表现不佳。解决方案(不一定是一个好的解决方案,但一个有效的解决方案)是在一个独立的 jar 中创建一个 java 方法来创建变异对象并 returns 它。通过这种方式,您可以将 jar 添加到 spark 的类路径并调用所需的方法 ass。使用 spark notebook 进行测试并成功更新现有的 accumulo table。我仍然会向 accumulo peeps 提交一张票,因为不应考虑这种解决方法 'best practice'。
使用 spark-notebook 更新 accumulo table。采用 accumulo documentation and the accumulo example code 中指定的方法。以下是我逐字记录在笔记本上的内容,以及回复:
val clientRqrdTble = new ClientOnRequiredTable
val bwConfig = new BatchWriterConfig
val batchWriter = connector.createBatchWriter("batchtestY", bwConfig);
clientRqrdTble: org.apache.accumulo.core.cli.ClientOnRequiredTable = org.apache.accumulo.core.cli.ClientOnRequiredTable@6c6a18ed bwConfig: org.apache.accumulo.core.client.BatchWriterConfig = [maxMemory=52428800, maxLatency=120000, maxWriteThreads=3, timeout=9223372036854775807] batchWriter: org.apache.accumulo.core.client.BatchWriter = org.apache.accumulo.core.client.impl.BatchWriterImpl@298aa736
val rowIdS = rddX2_first._1.split(" ")(0)
rowIdS: String = row_0736460000
val mutation = new Mutation(new Text(rowIdS))
mutation: org.apache.accumulo.core.data.Mutation = org.apache.accumulo.core.data.Mutation@0
mutation.put(
new Text("foo"),
new Text("1"),
new ColumnVisibility("exampleVis"),
new Value(new String("CHEWBACCA!").getBytes) )
java.lang.IllegalStateException: Can not add to mutation after serializing it at org.apache.accumulo.core.data.Mutation.put(Mutation.java:168) at org.apache.accumulo.core.data.Mutation.put(Mutation.java:163) at org.apache.accumulo.core.data.Mutation.put(Mutation.java:211)
我深入研究 the code,发现罪魁祸首是一个 if-catch,它正在检查 UnsynchronizedBuffer.Writer 缓冲区是否为空。行号不会对齐,因为这是一个与 1.6 accumulo-core jar 中的版本略有不同的版本——我已经查看了两者,但在这种情况下,差异并不大。据我所知,该对象是在执行该方法之前创建的,并且没有被转储。
所以要么我在代码中遗漏了某些东西,要么出现了其他问题。你们中有人知道是什么导致了这种行为吗?
更新一个
我使用 Scala 控制台并直接通过 java 1.8 执行了以下代码。它在 scala 中失败,但在 Java 中没有。我认为此时这是一个 Accumulo 问题。因此,我将打开一个错误票并更深入地挖掘源代码。如果我想出一个解决方案,我会 post 在这里。
下面是Java形式的代码。那里有一些额外的东西,因为我想确保我可以连接到我使用 accumulo 批处理编写器示例创建的 table:
import java.util.Map.Entry;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.client.*;
import org.apache.accumulo.core.client.mapred.*;
import org.apache.accumulo.core.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.cli.ClientOnRequiredTable.*;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.conf.Configured.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;
public class App {
public static void main( String[] args ) throws
AccumuloException,
AccumuloSecurityException,
TableNotFoundException {
// connect to accumulo using a scanner
// print first ten rows of a given table
String instanceNameS = "accumulo";
String zooServersS = "localhost:2181";
Instance instance = new ZooKeeperInstance(instanceNameS, zooServersS);
Connector connector =
instance.getConnector( "root", new PasswordToken("password"));
Authorizations auths = new Authorizations("exampleVis");
Scanner scanner = connector.createScanner("batchtestY", auths);
scanner.setRange(new Range("row_0000000001", "row_0000000010"));
for(Entry<Key, Value> entry : scanner) {
System.out.println(entry.getKey() + " is " + entry.getValue());
}
// stage up connection info objects for serialization
ClientOnRequiredTable clientRqrdTble = new ClientOnRequiredTable();
BatchWriterConfig bwConfig = new BatchWriterConfig();
BatchWriter batchWriter =
connector.createBatchWriter("batchtestY", bwConfig);
// create mutation object
Mutation mutation = new Mutation(new Text("row_0000000001"));
// populate mutation object
// -->THIS IS WHAT'S FAILING IN SCALA<--
mutation.put(
new Text("foo"),
new Text("1"),
new ColumnVisibility("exampleVis"),
new Value(new String("CHEWBACCA!").getBytes()) );
}
}
更新两个
已针对此问题创建 Accumulo bug ticket。他们的目标是在 v1.7.0 中修复此问题。在那之前,我在下面提供的解决方案是一个功能性的解决方法。
看起来当执行新的 Mutation 单元格时 spark-notebook 中发生的任何事情都在序列化 Mutation。序列化后,您不能调用 Mutation 上的 put 。我会尝试将 mutation.put 调用添加到与新 Mutation 命令相同的笔记本单元格。看起来 clientRqrdTble/bwConfig/batchWriter 命令在一个多行单元格中,所以希望这也适用于 Mutation。
所以似乎与 java 完美配合的代码在 Scala 中表现不佳。解决方案(不一定是一个好的解决方案,但一个有效的解决方案)是在一个独立的 jar 中创建一个 java 方法来创建变异对象并 returns 它。通过这种方式,您可以将 jar 添加到 spark 的类路径并调用所需的方法 ass。使用 spark notebook 进行测试并成功更新现有的 accumulo table。我仍然会向 accumulo peeps 提交一张票,因为不应考虑这种解决方法 'best practice'。