将 Spark Streaming RDD 推送到 Neo4j -Scala
Pushing Spark Streaming RDDs to Neo4j -Scala
我需要建立从 Spark Streaming 到 Neo4j 图形的连接 database.The RDD 的类型为 ((is,I),(am,Hello)(sam,happy....)。我需要在 Neo4j 中的每对单词之间建立一条边。
我在 Spark Streaming 文档中找到
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
to the push to the data to an external database.
我在 Scala 中这样做。我对如何去做有点困惑?我找到了 AnormCypher 和 Neo4jScala 包装器。我可以使用这些来完成工作吗?如果是这样,我该怎么做?如果没有,还有更好的选择吗?
谢谢大家....
看看 MazeRunner (http://www.kennybastani.com/2014/11/using-apache-spark-and-neo4j-for-big.html),它会给你一些想法。
我用 AnormCypher
做了一个实验
像这样:
implicit val connection = Neo4jREST.setServer("localhost", 7474, "/db/data/")
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(FILE, 4).cache()
val count = logData
.flatMap( _.split(" "))
.map( w =>
Cypher("CREATE(:Word {text:{text}})")
.on( "text" -> w ).execute()
).filter( _ ).count()
Neo4j 2.2.x 具有出色的并发写入性能,您可以使用 Spark。因此,您必须写入 Neo4j 的并发线程越多越好。如果每个请求可以批量处理 100 到 1000 个语句,那就更好了。
我需要建立从 Spark Streaming 到 Neo4j 图形的连接 database.The RDD 的类型为 ((is,I),(am,Hello)(sam,happy....)。我需要在 Neo4j 中的每对单词之间建立一条边。
我在 Spark Streaming 文档中找到
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
to the push to the data to an external database.
我在 Scala 中这样做。我对如何去做有点困惑?我找到了 AnormCypher 和 Neo4jScala 包装器。我可以使用这些来完成工作吗?如果是这样,我该怎么做?如果没有,还有更好的选择吗?
谢谢大家....
看看 MazeRunner (http://www.kennybastani.com/2014/11/using-apache-spark-and-neo4j-for-big.html),它会给你一些想法。
我用 AnormCypher
做了一个实验像这样:
implicit val connection = Neo4jREST.setServer("localhost", 7474, "/db/data/")
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(FILE, 4).cache()
val count = logData
.flatMap( _.split(" "))
.map( w =>
Cypher("CREATE(:Word {text:{text}})")
.on( "text" -> w ).execute()
).filter( _ ).count()
Neo4j 2.2.x 具有出色的并发写入性能,您可以使用 Spark。因此,您必须写入 Neo4j 的并发线程越多越好。如果每个请求可以批量处理 100 到 1000 个语句,那就更好了。