Spark streaming + Accumulo - 序列化 BatchWriterImpl
Spark streaming + Accumulo - Serialize BatchWriterImpl
我正在寻找 Spark Streaming + Accumulo 连接器和完整使用示例。
目前,我正在尝试将 Spark Streaming 结果写入 Accumulo table,但我收到 BatchWriter 的 NotSerializableException。有人可以指出如何序列化 BatchWriter 的示例吗?以下代码基于 Accumulo 文档。
当前代码:
val accumuloInstanceName = "accumulo"
val zooKeepers = "localhost:2181"
val instance = new ZooKeeperInstance(accumuloInstanceName, zooKeepers)
val accumuloUser = programOptions.accumuloUser()
val accumuloPassword = programOptions.accumuloPassword()
val passwordToken = new PasswordToken(accumuloPassword)
val connector = instance.getConnector(accumuloUser, passwordToken)
val accumuloBatchWriterConfig = new BatchWriterConfig
val accumuloBatchWriterMaxMemory = 32 * 1024 * 1024
accumuloBatchWriterConfig.setMaxMemory(accumuloBatchWriterMaxMemory)
val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig)
fullMergeResultFlatten.foreachRDD(recordRDD =>
recordRDD.foreach(record => {
val mutation = new Mutation(Longs.toByteArray(record.timestamp))
mutation.put("value", "", new Value(Longs.toByteArray(record.value)))
mutation.put("length", "", new Value(Longs.toByteArray(record.length)))
accumuloBatchWriter.addMutation(mutation)
})
)
在运行期间发生错误:
17/05/05 16:55:25 ERROR util.Utils: Exception encountered
java.io.NotSerializableException: org.apache.accumulo.core.client.impl.BatchWriterImpl
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
我想这是很常见的情况,但我找不到任何简单的 spark streaming + accumulo 示例。
您不能序列化 BatchWriter class。我没有关于如何修复您的代码的建议,但我可以说尝试序列化 class 不是正确的前进方式。
正如 elserj 所指出的,序列化连接对象通常不是正确的模式。我看到的模式是直接使用 RDD.foreachPartition() 从 Spark 工作节点发起连接。这很好,因为它允许您为每批工作创建一个连接(而不是为几乎从不高效的每个单独记录创建一个新连接)。
示例:
fullMergeResultFlatten.foreachRDD(recordRDD => {
recordRDD.foreachPartition(partitionRecords => {
// this connection logic is executed in the Spark workers
val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig)
partitionRecords.foreach( // save operation )
accumuloBatchWriter.close()
})
})
我正在寻找 Spark Streaming + Accumulo 连接器和完整使用示例。
目前,我正在尝试将 Spark Streaming 结果写入 Accumulo table,但我收到 BatchWriter 的 NotSerializableException。有人可以指出如何序列化 BatchWriter 的示例吗?以下代码基于 Accumulo 文档。
当前代码:
val accumuloInstanceName = "accumulo"
val zooKeepers = "localhost:2181"
val instance = new ZooKeeperInstance(accumuloInstanceName, zooKeepers)
val accumuloUser = programOptions.accumuloUser()
val accumuloPassword = programOptions.accumuloPassword()
val passwordToken = new PasswordToken(accumuloPassword)
val connector = instance.getConnector(accumuloUser, passwordToken)
val accumuloBatchWriterConfig = new BatchWriterConfig
val accumuloBatchWriterMaxMemory = 32 * 1024 * 1024
accumuloBatchWriterConfig.setMaxMemory(accumuloBatchWriterMaxMemory)
val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig)
fullMergeResultFlatten.foreachRDD(recordRDD =>
recordRDD.foreach(record => {
val mutation = new Mutation(Longs.toByteArray(record.timestamp))
mutation.put("value", "", new Value(Longs.toByteArray(record.value)))
mutation.put("length", "", new Value(Longs.toByteArray(record.length)))
accumuloBatchWriter.addMutation(mutation)
})
)
在运行期间发生错误:
17/05/05 16:55:25 ERROR util.Utils: Exception encountered
java.io.NotSerializableException: org.apache.accumulo.core.client.impl.BatchWriterImpl
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
我想这是很常见的情况,但我找不到任何简单的 spark streaming + accumulo 示例。
您不能序列化 BatchWriter class。我没有关于如何修复您的代码的建议,但我可以说尝试序列化 class 不是正确的前进方式。
正如 elserj 所指出的,序列化连接对象通常不是正确的模式。我看到的模式是直接使用 RDD.foreachPartition() 从 Spark 工作节点发起连接。这很好,因为它允许您为每批工作创建一个连接(而不是为几乎从不高效的每个单独记录创建一个新连接)。
示例:
fullMergeResultFlatten.foreachRDD(recordRDD => {
recordRDD.foreachPartition(partitionRecords => {
// this connection logic is executed in the Spark workers
val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig)
partitionRecords.foreach( // save operation )
accumuloBatchWriter.close()
})
})