是否可以从 spark 作业中引用或更新 memcache/reddis/DB 数据?

Is it possible to refer or update memcache/reddis/DB data from spark jobs?

我们在 cache/DB 中有一些业务数据。我们每天处理大量日志数据并更新我们的数据 cache/DB。其中一些更新几乎是实时的,而另一些则是分批进行的。我们有 spark jobs 做很多转换。我们将 spark 作业的结果存储在文本文件中,然后 运行 另一个顺序作业将它们放入我们的 cache/DB.

我考虑过使用connectors(mongoDB-spark connector, redis-spark connector),把整个数据做成RDD做处理。但是,与我们所做的日志文件和每日更新相比,我们的业务数据量确实很大。所以,放弃了。

问题:

  1. 我们可以在 cache/DB 上从执行器直接更新以避免最后一步吗?
  2. 任何其他建议或替代方法以获得更好的性能?
  3. 你在这里看到任何反模式了吗?

如果您对数据库的写入很简单,您可以使用以下方式直接写入数据库:

myDF.write
    .mode("overwrite") // Choose the mode you want from org.apache.spark.sql.SaveMode
    .jdbc(url, "my_table", props)

如果查询不只是简单的插入(例如,我的查询中包含 on duplicate key update 部分的查询),您将需要自己完成。

您可以使用mapPartitions()在分区之间分配写入。

myDF.mapPartitions(rows => {
  val connection = DriverManager.getConnection(URL, properties)

  rows.foreach(bulk => {
      val statement = connection.prepareStatement(myQuery)
      bulk.foreach(row => {
        statement.setString(1, row.getString(0))
        statement.setInt(2, row.getInt(1))
        ...
        statement.addBatch()
      })

      statement.executeLargeBatch().iterator
    })
  rows
}).count //An action here is required, to trigger the mapPartitions()