在 Apache Spark 中广播外部库对象

Broadcasting external library object in Apache Spark

我正在尝试在 spark-notebook 的 Apache Spark 中进行 NLP。对于这个特定示例,我使用库 https://opennlp.apache.org 创建一个词块提取器来提取名词短语。 由于数据量的增加,我需要转向分布式计算。

问题是我无法广播我的分块器对象。通过阅读文档(它只投射像数组这样的简单对象)我尝试了以下内容:

import opennlp.tools.tokenize.WhitespaceTokenizer
import opennlp.tools.cmdline.postag.POSModelLoader
import opennlp.tools.postag.POSTaggerME
import opennlp.tools.chunker.ChunkerModel
import opennlp.tools.chunker.ChunkerME
import java.io.FileInputStream
import java.io.File

//Instantiate the ChunkerME class 
val inputStream = new FileInputStream("fr-chunk.bin"); 
val chunkerModel = new ChunkerModel(inputStream);
val chunkerME = new ChunkerME(chunkerModel); 

val broadCastedChunkerME = sc.broadcast(chunkerME)

但这会抛出以下错误:

java.io.NotSerializableException: opennlp.tools.chunker.ChunkerME
Serialization stack:
    - object not serializable (class: opennlp.tools.chunker.ChunkerME, value: opennlp.tools.chunker.ChunkerME@35a5c281)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject.apply(TorrentBroadcast.scala:268)
  at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject.apply(TorrentBroadcast.scala:268)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
  at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:269)
  at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
  at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
  at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
  at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
  at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411)
  ... 63 elided

如果我将分块器的初始化包装在函数中,然后像下面这样在 map 方法中调用该函数,那么起作用的是:

def getNPChunks(sentence: String): Array[Chunk] = {
  import opennlp.tools.chunker.ChunkerModel
  import opennlp.tools.chunker.ChunkerME
  import java.io.FileInputStream

  val inputStream = new FileInputStream("fr-chunk.bin"); 
  val chunkerModel = new ChunkerModel(inputStream);

  //Instantiate the ChunkerME class 
  val chunkerME = new ChunkerME(chunkerModel); 

  chunkerME.chunkAsSpans(sentence); 
}

// call the chunker 
line.map(getNPChunks)

但这里的问题是这段代码非常低效,因为它正在为 rdd 中的每个条目初始化一个 chunker 对象。因为 map 函数正在为 rdd 的每个条目调用 getNPChunks 函数,并且对于每个条目,我最终创建了一个新的块对象。

由于这种低效的设计,我的 spark 脚本 运行 比顺序脚本慢 20 倍。

我做错了什么?

解决问题的方法是使用mapPartitions.
这样你就可以为每个分区创建一个分块器而不是每行一个:

def getChunker():
  val inputStream = new FileInputStream("fr-chunk.bin"); 
  val chunkerModel = new ChunkerModel(inputStream);

  //Instantiate the ChunkerME class 
  val chunkerME = new ChunkerME(chunkerModel); 

line.mapPartitions(it =>
   val chunker = getChunker()
   it.map(line => chunker.chunkAsSpans(line))
)

有关 mapPartitions 的更多详细信息,请参阅此答案:

初始化"chunkerME"内部scala对象然后广播it.The Scala对象默认是序列化的。编译器对 scala 对象进行序列化。

或者如果在 scala class 中初始化,scala class 需要通过扩展 Serializable 特征来显式序列化。