使用 Spark(或 Spark Streaming)在 Titan DB 上插入数据
Insert data on TitanDB using Spark (or SparkStreaming)
我正在尝试使用 SparkStreaming(从 Kafka 队列收集消息)向 TitanDB 添加元素。但似乎比想象中的要难。
这里Titan连接的定义:
val confPath: String = "titan-cassandra-es-spark.properties"
val conn: TitanModule = new TitanModule(confPath)
Titan 模块是配置 TitanDB 连接的Serializable
class:
...
val configurationFilePath: String = confFilePath
val configuration = new PropertiesConfiguration(configurationFilePath)
val gConn: TitanGraph = TitanFactory.open(configuration)
...
当我执行从 Kafka 队列收集消息 (json) 的 sparkStreaming 作业时,它收到消息并尝试将其添加到 TitanDB 中,它会爆炸并显示以下堆栈跟踪。
你们知道用SparkStreaming向TitanDB中添加数据是否可行吗?
你知道解决这个问题的方法是什么吗?
18:03:50,596 ERROR JobScheduler:95 - Error running job streaming job 1464624230000 ms.0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:911)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:910)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
at salvob.SparkConsumer$$anonfun$main.apply(SparkConsumer.scala:200)
at salvob.SparkConsumer$$anonfun$main.apply(SparkConsumer.scala:132)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.commons.configuration.PropertiesConfiguration
Serialization stack:
- object not serializable (class: org.apache.commons.configuration.PropertiesConfiguration, value: org.apache.commons.configuration.PropertiesConfiguration@2cef9ce8)
- field (class: salvob.TitanModule, name: configuration, type: class org.apache.commons.configuration.PropertiesConfiguration)
- object (class salvob.TitanModule, salvob.TitanModule@20d984db)
- field (class: salvob.SparkConsumer$$anonfun$main$$anonfun$apply, name: conn, type: class salvob.TitanModule)
- object (class salvob.SparkConsumer$$anonfun$main$$anonfun$apply, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 28 more
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:911)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:910)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
at salvob.SparkConsumer$$anonfun$main.apply(SparkConsumer.scala:200)
at salvob.SparkConsumer$$anonfun$main.apply(SparkConsumer.scala:132)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.commons.configuration.PropertiesConfiguration
Serialization stack:
- object not serializable (class: org.apache.commons.configuration.PropertiesConfiguration, value: org.apache.commons.configuration.PropertiesConfiguration@2cef9ce8)
- field (class: salvob.TitanModule, name: configuration, type: class org.apache.commons.configuration.PropertiesConfiguration)
- object (class salvob.TitanModule, salvob.TitanModule@20d984db)
- field (class: salvob.SparkConsumer$$anonfun$main$$anonfun$apply, name: conn, type: class salvob.TitanModule)
- object (class salvob.SparkConsumer$$anonfun$main$$anonfun$apply, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 28 more
确保所有可以传递给其他从机的类都是可序列化的。这很重要。不要初始化这些传递的 类.
之外的任何变量
我使用过 Apache Spark(非流式处理)并且运行良好。由于 Titan 使用 Spark 的一个版本,因此要做到这一点并不容易。所以会有一些依赖冲突。这是唯一可用的版本
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.2</version>
</dependency>
这就是我启动集群的方式。
SparkConf conf = new SparkConf()
.setAppName(AbstractSparkImporter.class.getCanonicalName())
.setMaster("spark_cluster_name");
this.sc = new JavaSparkContext(conf);
this.numPartitions=new Integer(num);
然后解析数据
JavaRDD<T> javaRDD = initRetriever(); // init JavaRDD
javaRDD.foreachPartition(iter->{
Graph graph= initGraph();
Parser<T> parser= initParser(graph);
while(iter.hasNext()){
try {
parser.parse(iter); // extends serializable !
} catch (Exception e) {
logger.error("Failed in importing all vertices ", e);
graph.tx().rollback();
}
}
graph.tx().commit();
});
如果有必要,我可能会在 Github 上发布此模块。
Spark Streaming 生成 RDD。 RDD 中的数据处理发生在工作节点上。您在 rdd.map() 中编写的代码与该块中引用的对象一起被序列化并发送到工作节点进行处理。
因此,通过 Spark 使用图形实例的理想方式如下:
streamRdd.map(kafkaTuple => {
// create graph instance
// use graph instance to add / modify graph
// close graph instance
})
但这将为每一行创建一个新的图形实例。作为优化,您可以为每个实例创建图形实例
rdd.foreachPartition((rddRows: Iterator[kafkaTuple]) => {
val graph: TitanGraph = // create titan instance
val trans: TitanTransaction = graph.newTransaction()
rddRows.foreach(graphVertex => {
// do graph insertion in the above transaction
})
createVertexTrans.commit()
graph.close()
})
graph.newTransaction() 此处有助于多线程图形更新。否则你会得到锁异常。
唯一的问题是,根据我目前所读的内容,没有直接支持多节点更新。据我所见,每当 Titan Transaction 尝试修改顶点时,它都会使用锁更新 HBase。因此其他分区在尝试进行任何更新时都会失败。您将不得不构建一个外部同步机制或将您的 rdd 重新分区为一个分区,然后使用上面的代码进行更新。
我正在尝试使用 SparkStreaming(从 Kafka 队列收集消息)向 TitanDB 添加元素。但似乎比想象中的要难。 这里Titan连接的定义:
val confPath: String = "titan-cassandra-es-spark.properties"
val conn: TitanModule = new TitanModule(confPath)
Titan 模块是配置 TitanDB 连接的Serializable
class:
...
val configurationFilePath: String = confFilePath
val configuration = new PropertiesConfiguration(configurationFilePath)
val gConn: TitanGraph = TitanFactory.open(configuration)
...
当我执行从 Kafka 队列收集消息 (json) 的 sparkStreaming 作业时,它收到消息并尝试将其添加到 TitanDB 中,它会爆炸并显示以下堆栈跟踪。
你们知道用SparkStreaming向TitanDB中添加数据是否可行吗? 你知道解决这个问题的方法是什么吗?
18:03:50,596 ERROR JobScheduler:95 - Error running job streaming job 1464624230000 ms.0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:911)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:910)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
at salvob.SparkConsumer$$anonfun$main.apply(SparkConsumer.scala:200)
at salvob.SparkConsumer$$anonfun$main.apply(SparkConsumer.scala:132)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.commons.configuration.PropertiesConfiguration
Serialization stack:
- object not serializable (class: org.apache.commons.configuration.PropertiesConfiguration, value: org.apache.commons.configuration.PropertiesConfiguration@2cef9ce8)
- field (class: salvob.TitanModule, name: configuration, type: class org.apache.commons.configuration.PropertiesConfiguration)
- object (class salvob.TitanModule, salvob.TitanModule@20d984db)
- field (class: salvob.SparkConsumer$$anonfun$main$$anonfun$apply, name: conn, type: class salvob.TitanModule)
- object (class salvob.SparkConsumer$$anonfun$main$$anonfun$apply, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 28 more
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:911)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:910)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
at salvob.SparkConsumer$$anonfun$main.apply(SparkConsumer.scala:200)
at salvob.SparkConsumer$$anonfun$main.apply(SparkConsumer.scala:132)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.commons.configuration.PropertiesConfiguration
Serialization stack:
- object not serializable (class: org.apache.commons.configuration.PropertiesConfiguration, value: org.apache.commons.configuration.PropertiesConfiguration@2cef9ce8)
- field (class: salvob.TitanModule, name: configuration, type: class org.apache.commons.configuration.PropertiesConfiguration)
- object (class salvob.TitanModule, salvob.TitanModule@20d984db)
- field (class: salvob.SparkConsumer$$anonfun$main$$anonfun$apply, name: conn, type: class salvob.TitanModule)
- object (class salvob.SparkConsumer$$anonfun$main$$anonfun$apply, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 28 more
确保所有可以传递给其他从机的类都是可序列化的。这很重要。不要初始化这些传递的 类.
之外的任何变量我使用过 Apache Spark(非流式处理)并且运行良好。由于 Titan 使用 Spark 的一个版本,因此要做到这一点并不容易。所以会有一些依赖冲突。这是唯一可用的版本
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.2</version>
</dependency>
这就是我启动集群的方式。
SparkConf conf = new SparkConf()
.setAppName(AbstractSparkImporter.class.getCanonicalName())
.setMaster("spark_cluster_name");
this.sc = new JavaSparkContext(conf);
this.numPartitions=new Integer(num);
然后解析数据
JavaRDD<T> javaRDD = initRetriever(); // init JavaRDD
javaRDD.foreachPartition(iter->{
Graph graph= initGraph();
Parser<T> parser= initParser(graph);
while(iter.hasNext()){
try {
parser.parse(iter); // extends serializable !
} catch (Exception e) {
logger.error("Failed in importing all vertices ", e);
graph.tx().rollback();
}
}
graph.tx().commit();
});
如果有必要,我可能会在 Github 上发布此模块。
Spark Streaming 生成 RDD。 RDD 中的数据处理发生在工作节点上。您在 rdd.map() 中编写的代码与该块中引用的对象一起被序列化并发送到工作节点进行处理。
因此,通过 Spark 使用图形实例的理想方式如下:
streamRdd.map(kafkaTuple => {
// create graph instance
// use graph instance to add / modify graph
// close graph instance
})
但这将为每一行创建一个新的图形实例。作为优化,您可以为每个实例创建图形实例
rdd.foreachPartition((rddRows: Iterator[kafkaTuple]) => {
val graph: TitanGraph = // create titan instance
val trans: TitanTransaction = graph.newTransaction()
rddRows.foreach(graphVertex => {
// do graph insertion in the above transaction
})
createVertexTrans.commit()
graph.close()
})
graph.newTransaction() 此处有助于多线程图形更新。否则你会得到锁异常。
唯一的问题是,根据我目前所读的内容,没有直接支持多节点更新。据我所见,每当 Titan Transaction 尝试修改顶点时,它都会使用锁更新 HBase。因此其他分区在尝试进行任何更新时都会失败。您将不得不构建一个外部同步机制或将您的 rdd 重新分区为一个分区,然后使用上面的代码进行更新。