是否可以从 spark 作业中引用或更新 memcache/reddis/DB 数据?
Is it possible to refer or update memcache/reddis/DB data from spark jobs?
我们在 cache/DB 中有一些业务数据。我们每天处理大量日志数据并更新我们的数据 cache/DB。其中一些更新几乎是实时的,而另一些则是分批进行的。我们有 spark jobs 做很多转换。我们将 spark 作业的结果存储在文本文件中,然后 运行 另一个顺序作业将它们放入我们的 cache/DB.
我考虑过使用connectors(mongoDB-spark connector, redis-spark connector),把整个数据做成RDD做处理。但是,与我们所做的日志文件和每日更新相比,我们的业务数据量确实很大。所以,放弃了。
问题:
- 我们可以在 cache/DB 上从执行器直接更新以避免最后一步吗?
- 任何其他建议或替代方法以获得更好的性能?
- 你在这里看到任何反模式了吗?
如果您对数据库的写入很简单,您可以使用以下方式直接写入数据库:
myDF.write
.mode("overwrite") // Choose the mode you want from org.apache.spark.sql.SaveMode
.jdbc(url, "my_table", props)
如果查询不只是简单的插入(例如,我的查询中包含 on duplicate key update
部分的查询),您将需要自己完成。
您可以使用mapPartitions()
在分区之间分配写入。
myDF.mapPartitions(rows => {
val connection = DriverManager.getConnection(URL, properties)
rows.foreach(bulk => {
val statement = connection.prepareStatement(myQuery)
bulk.foreach(row => {
statement.setString(1, row.getString(0))
statement.setInt(2, row.getInt(1))
...
statement.addBatch()
})
statement.executeLargeBatch().iterator
})
rows
}).count //An action here is required, to trigger the mapPartitions()
我们在 cache/DB 中有一些业务数据。我们每天处理大量日志数据并更新我们的数据 cache/DB。其中一些更新几乎是实时的,而另一些则是分批进行的。我们有 spark jobs 做很多转换。我们将 spark 作业的结果存储在文本文件中,然后 运行 另一个顺序作业将它们放入我们的 cache/DB.
我考虑过使用connectors(mongoDB-spark connector, redis-spark connector),把整个数据做成RDD做处理。但是,与我们所做的日志文件和每日更新相比,我们的业务数据量确实很大。所以,放弃了。
问题:
- 我们可以在 cache/DB 上从执行器直接更新以避免最后一步吗?
- 任何其他建议或替代方法以获得更好的性能?
- 你在这里看到任何反模式了吗?
如果您对数据库的写入很简单,您可以使用以下方式直接写入数据库:
myDF.write
.mode("overwrite") // Choose the mode you want from org.apache.spark.sql.SaveMode
.jdbc(url, "my_table", props)
如果查询不只是简单的插入(例如,我的查询中包含 on duplicate key update
部分的查询),您将需要自己完成。
您可以使用mapPartitions()
在分区之间分配写入。
myDF.mapPartitions(rows => {
val connection = DriverManager.getConnection(URL, properties)
rows.foreach(bulk => {
val statement = connection.prepareStatement(myQuery)
bulk.foreach(row => {
statement.setString(1, row.getString(0))
statement.setInt(2, row.getInt(1))
...
statement.addBatch()
})
statement.executeLargeBatch().iterator
})
rows
}).count //An action here is required, to trigger the mapPartitions()