图形数据库 (Neo4j) 插入的 Spark UDF 优化
Spark UDF optimization for Graph Database (Neo4j) inserts
这是我发布的第一期,如果我错过了一些信息和平庸的格式,我们深表歉意。如果需要,我可以更新。
我会尝试添加尽可能多的细节。我有一个不太优化的 Spark 作业,它将 RDBMS 数据转换为 Neo4j 中的图形节点和关系。
为此。这是我遵循的步骤:
- 使用 spark sql 和连接创建一个非规范化数据帧 'data'。
在 'data' 运行 中的每个行执行以下操作的 graphInsert 函数:
一个。读取行的内容
b。制定一个 neo4j 密码查询(我们使用 Merge 命令,这样我们只有一个城市,例如芝加哥在 Neo4j 中创建,而芝加哥将在 RDBMS 中以多行显示 table)
c。连接到 neo4j
d。执行查询
e.断开与 neo4j 的连接
这是我面临的问题列表。
- 插入很慢。
I know Merge query is slower than create but is there another way to do this instead of connecting and disconnecting for every record? This was my first draft code and maybe i am struggling how i will use one connection to insert from multiple threads on different spark worker nodes. Hence connecting and disconnecting for every record.
- 作业不可扩展。只有 运行 1 个核心没问题。一旦我 运行 使用 2 个 spark 核心的工作,我突然得到 2 个同名的城市,即使我正在 运行ning 合并查询。例如芝加哥有 2 个城市违反了 Merge 的使用规定。我假设 Merge 函数类似于 "Create if not exist".
I dont know if my implementation is wrong in neo4j part or spark. If anyone can direct me to any documentation which helps me implement this on a better scale it will be helpful as i have a big spark cluster which i need to utilize at full potential for this job.
如果您有兴趣查看代码而不是算法。这是 scala 中的 graphInsert 实现:
class GraphInsert extends Serializable{
var case_attributes = new Array[String](4)
var city_attributes = new Array[String](2)
var location_attributes = new Array[String](20)
var incident_attributes = new Array[String](20)
val prop = new Properties()
prop.load(getClass().getResourceAsStream("/GraphInsertConnection.properties"))
// properties Neo4j
val url_neo4j = prop.getProperty("url_neo4j")
val neo4j_user = prop.getProperty("neo4j_user")
val neo4j_password = prop.getProperty("neo4j_password")
def graphInsert(data : Row){
val query = "MERGE (d:CITY {name:city_attributes(0)})\n" +"MERGE (a:CASE { " + case_attributes(0) + ":'" +data(11) + "'," +case_attributes(1) + ":'" +data(13) + "'," +case_attributes(2) + ":'" +data(14) +"'}) \n" +"MERGE (b:INCIDENT { " + incident_attributes(0) + ":" +data(0) + "," +incident_attributes(1) + ":" +data(2) + "," +incident_attributes(2) + ":'" +data(3) + "'," +incident_attributes(3) + ":'" +data(8)+ "'," +incident_attributes(4) + ":" +data(5) + "," +incident_attributes(5) + ":'" +data(4) + "'," +incident_attributes(6) + ":'" +data(6) + "'," +incident_attributes(7) + ":'" +data(1) + "'," +incident_attributes(8) + ":" +data(7)+"}) \n" +"MERGE (c:LOCATION { " + location_attributes(0) + ":" +data(9) + "," +location_attributes(1) + ":" +data(10) + "," +location_attributes(2) + ":'" +data(19) + "'," +location_attributes(3) + ":'" +data(20)+ "'," +location_attributes(4) + ":" +data(18) + "," +location_attributes(5) + ":" +data(21) + "," +location_attributes(6) + ":'" +data(17) + "'," +location_attributes(7) + ":" +data(22) + "," +location_attributes(8) + ":" +data(23)+"}) \n" +"MERGE (a) - [r1:"+relation_case_incident+"]->(b)-[r2:"+relation_incident_location+"]->(c)-[r3:belongs_to]->(d);"
println(query)
try{
var con = DriverManager.getConnection(url_neo4j, neo4j_user, neo4j_password)
var stmt = con.createStatement()
var rs = stmt.executeQuery(query)
con.close()
}catch{
case ex: SQLException =>{
println(ex.getMessage)
}
}
}
def operations(sqlContext: SQLContext){
....
#Get 'data' before this step
city_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_city").map(x =>x.getString(5)).collect()
case_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_case_number").map(x =>x.getString(5)).collect()
location_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_location").map(x =>x.getString(5)).collect()
incident_attributes= entity_metadata.filter(entity_metadata("source_name") === "tb_incident").map(x =>x.getString(5)).collect()
data.foreach(graphInsert)
}
object GraphObject {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("GraphNeo4j")
.setMaster("xyz")
.set("spark.cores.max","2")
.set("spark.executor.memory","10g")
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val graph = new GraphInsert()
graph.operations(sqlContext)
}
}
无论你在闭包中写什么,即它需要在 Worker 上执行,都会被分发。
您可以在这里阅读更多相关信息:http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka
当你增加核心数时,我认为它一定不会影响应用程序,因为如果你不指定它!然后它采取贪婪的方法!希望这份文件对您有所帮助。
我已经改进了这个过程,但没有什么能像 Cypher 中的 LOAD 命令那样快。
希望这对某人有所帮助:
使用 foreachPartition
而不是 foreach
可以在执行此类过程时获得显着收益。还使用密码添加定期提交。
这是我发布的第一期,如果我错过了一些信息和平庸的格式,我们深表歉意。如果需要,我可以更新。
我会尝试添加尽可能多的细节。我有一个不太优化的 Spark 作业,它将 RDBMS 数据转换为 Neo4j 中的图形节点和关系。
为此。这是我遵循的步骤:
- 使用 spark sql 和连接创建一个非规范化数据帧 'data'。
在 'data' 运行 中的每个行执行以下操作的 graphInsert 函数:
一个。读取行的内容
b。制定一个 neo4j 密码查询(我们使用 Merge 命令,这样我们只有一个城市,例如芝加哥在 Neo4j 中创建,而芝加哥将在 RDBMS 中以多行显示 table)
c。连接到 neo4j
d。执行查询
e.断开与 neo4j 的连接
这是我面临的问题列表。
- 插入很慢。
I know Merge query is slower than create but is there another way to do this instead of connecting and disconnecting for every record? This was my first draft code and maybe i am struggling how i will use one connection to insert from multiple threads on different spark worker nodes. Hence connecting and disconnecting for every record.
- 作业不可扩展。只有 运行 1 个核心没问题。一旦我 运行 使用 2 个 spark 核心的工作,我突然得到 2 个同名的城市,即使我正在 运行ning 合并查询。例如芝加哥有 2 个城市违反了 Merge 的使用规定。我假设 Merge 函数类似于 "Create if not exist".
I dont know if my implementation is wrong in neo4j part or spark. If anyone can direct me to any documentation which helps me implement this on a better scale it will be helpful as i have a big spark cluster which i need to utilize at full potential for this job.
如果您有兴趣查看代码而不是算法。这是 scala 中的 graphInsert 实现:
class GraphInsert extends Serializable{
var case_attributes = new Array[String](4)
var city_attributes = new Array[String](2)
var location_attributes = new Array[String](20)
var incident_attributes = new Array[String](20)
val prop = new Properties()
prop.load(getClass().getResourceAsStream("/GraphInsertConnection.properties"))
// properties Neo4j
val url_neo4j = prop.getProperty("url_neo4j")
val neo4j_user = prop.getProperty("neo4j_user")
val neo4j_password = prop.getProperty("neo4j_password")
def graphInsert(data : Row){
val query = "MERGE (d:CITY {name:city_attributes(0)})\n" +"MERGE (a:CASE { " + case_attributes(0) + ":'" +data(11) + "'," +case_attributes(1) + ":'" +data(13) + "'," +case_attributes(2) + ":'" +data(14) +"'}) \n" +"MERGE (b:INCIDENT { " + incident_attributes(0) + ":" +data(0) + "," +incident_attributes(1) + ":" +data(2) + "," +incident_attributes(2) + ":'" +data(3) + "'," +incident_attributes(3) + ":'" +data(8)+ "'," +incident_attributes(4) + ":" +data(5) + "," +incident_attributes(5) + ":'" +data(4) + "'," +incident_attributes(6) + ":'" +data(6) + "'," +incident_attributes(7) + ":'" +data(1) + "'," +incident_attributes(8) + ":" +data(7)+"}) \n" +"MERGE (c:LOCATION { " + location_attributes(0) + ":" +data(9) + "," +location_attributes(1) + ":" +data(10) + "," +location_attributes(2) + ":'" +data(19) + "'," +location_attributes(3) + ":'" +data(20)+ "'," +location_attributes(4) + ":" +data(18) + "," +location_attributes(5) + ":" +data(21) + "," +location_attributes(6) + ":'" +data(17) + "'," +location_attributes(7) + ":" +data(22) + "," +location_attributes(8) + ":" +data(23)+"}) \n" +"MERGE (a) - [r1:"+relation_case_incident+"]->(b)-[r2:"+relation_incident_location+"]->(c)-[r3:belongs_to]->(d);"
println(query)
try{
var con = DriverManager.getConnection(url_neo4j, neo4j_user, neo4j_password)
var stmt = con.createStatement()
var rs = stmt.executeQuery(query)
con.close()
}catch{
case ex: SQLException =>{
println(ex.getMessage)
}
}
}
def operations(sqlContext: SQLContext){
....
#Get 'data' before this step
city_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_city").map(x =>x.getString(5)).collect()
case_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_case_number").map(x =>x.getString(5)).collect()
location_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_location").map(x =>x.getString(5)).collect()
incident_attributes= entity_metadata.filter(entity_metadata("source_name") === "tb_incident").map(x =>x.getString(5)).collect()
data.foreach(graphInsert)
}
object GraphObject {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("GraphNeo4j")
.setMaster("xyz")
.set("spark.cores.max","2")
.set("spark.executor.memory","10g")
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val graph = new GraphInsert()
graph.operations(sqlContext)
}
}
无论你在闭包中写什么,即它需要在 Worker 上执行,都会被分发。 您可以在这里阅读更多相关信息:http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka
当你增加核心数时,我认为它一定不会影响应用程序,因为如果你不指定它!然后它采取贪婪的方法!希望这份文件对您有所帮助。
我已经改进了这个过程,但没有什么能像 Cypher 中的 LOAD 命令那样快。
希望这对某人有所帮助:
使用 foreachPartition
而不是 foreach
可以在执行此类过程时获得显着收益。还使用密码添加定期提交。