在 Apache Spark 中广播外部库对象
Broadcasting external library object in Apache Spark
我正在尝试在 spark-notebook 的 Apache Spark 中进行 NLP。对于这个特定示例,我使用库 https://opennlp.apache.org 创建一个词块提取器来提取名词短语。
由于数据量的增加,我需要转向分布式计算。
问题是我无法广播我的分块器对象。通过阅读文档(它只投射像数组这样的简单对象)我尝试了以下内容:
import opennlp.tools.tokenize.WhitespaceTokenizer
import opennlp.tools.cmdline.postag.POSModelLoader
import opennlp.tools.postag.POSTaggerME
import opennlp.tools.chunker.ChunkerModel
import opennlp.tools.chunker.ChunkerME
import java.io.FileInputStream
import java.io.File
//Instantiate the ChunkerME class
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
val chunkerME = new ChunkerME(chunkerModel);
val broadCastedChunkerME = sc.broadcast(chunkerME)
但这会抛出以下错误:
java.io.NotSerializableException: opennlp.tools.chunker.ChunkerME
Serialization stack:
- object not serializable (class: opennlp.tools.chunker.ChunkerME, value: opennlp.tools.chunker.ChunkerME@35a5c281)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject.apply(TorrentBroadcast.scala:268)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject.apply(TorrentBroadcast.scala:268)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:269)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411)
... 63 elided
如果我将分块器的初始化包装在函数中,然后像下面这样在 map 方法中调用该函数,那么起作用的是:
def getNPChunks(sentence: String): Array[Chunk] = {
import opennlp.tools.chunker.ChunkerModel
import opennlp.tools.chunker.ChunkerME
import java.io.FileInputStream
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
//Instantiate the ChunkerME class
val chunkerME = new ChunkerME(chunkerModel);
chunkerME.chunkAsSpans(sentence);
}
// call the chunker
line.map(getNPChunks)
但这里的问题是这段代码非常低效,因为它正在为 rdd 中的每个条目初始化一个 chunker 对象。因为 map 函数正在为 rdd 的每个条目调用 getNPChunks 函数,并且对于每个条目,我最终创建了一个新的块对象。
由于这种低效的设计,我的 spark 脚本 运行 比顺序脚本慢 20 倍。
我做错了什么?
解决问题的方法是使用mapPartitions
.
这样你就可以为每个分区创建一个分块器而不是每行一个:
def getChunker():
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
//Instantiate the ChunkerME class
val chunkerME = new ChunkerME(chunkerModel);
line.mapPartitions(it =>
val chunker = getChunker()
it.map(line => chunker.chunkAsSpans(line))
)
有关 mapPartitions
的更多详细信息,请参阅此答案:
初始化"chunkerME"内部scala对象然后广播it.The Scala对象默认是序列化的。编译器对 scala 对象进行序列化。
或者如果在 scala class 中初始化,scala class 需要通过扩展 Serializable 特征来显式序列化。
我正在尝试在 spark-notebook 的 Apache Spark 中进行 NLP。对于这个特定示例,我使用库 https://opennlp.apache.org 创建一个词块提取器来提取名词短语。 由于数据量的增加,我需要转向分布式计算。
问题是我无法广播我的分块器对象。通过阅读文档(它只投射像数组这样的简单对象)我尝试了以下内容:
import opennlp.tools.tokenize.WhitespaceTokenizer
import opennlp.tools.cmdline.postag.POSModelLoader
import opennlp.tools.postag.POSTaggerME
import opennlp.tools.chunker.ChunkerModel
import opennlp.tools.chunker.ChunkerME
import java.io.FileInputStream
import java.io.File
//Instantiate the ChunkerME class
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
val chunkerME = new ChunkerME(chunkerModel);
val broadCastedChunkerME = sc.broadcast(chunkerME)
但这会抛出以下错误:
java.io.NotSerializableException: opennlp.tools.chunker.ChunkerME
Serialization stack:
- object not serializable (class: opennlp.tools.chunker.ChunkerME, value: opennlp.tools.chunker.ChunkerME@35a5c281)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject.apply(TorrentBroadcast.scala:268)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject.apply(TorrentBroadcast.scala:268)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:269)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411)
... 63 elided
如果我将分块器的初始化包装在函数中,然后像下面这样在 map 方法中调用该函数,那么起作用的是:
def getNPChunks(sentence: String): Array[Chunk] = {
import opennlp.tools.chunker.ChunkerModel
import opennlp.tools.chunker.ChunkerME
import java.io.FileInputStream
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
//Instantiate the ChunkerME class
val chunkerME = new ChunkerME(chunkerModel);
chunkerME.chunkAsSpans(sentence);
}
// call the chunker
line.map(getNPChunks)
但这里的问题是这段代码非常低效,因为它正在为 rdd 中的每个条目初始化一个 chunker 对象。因为 map 函数正在为 rdd 的每个条目调用 getNPChunks 函数,并且对于每个条目,我最终创建了一个新的块对象。
由于这种低效的设计,我的 spark 脚本 运行 比顺序脚本慢 20 倍。
我做错了什么?
解决问题的方法是使用mapPartitions
.
这样你就可以为每个分区创建一个分块器而不是每行一个:
def getChunker():
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
//Instantiate the ChunkerME class
val chunkerME = new ChunkerME(chunkerModel);
line.mapPartitions(it =>
val chunker = getChunker()
it.map(line => chunker.chunkAsSpans(line))
)
有关 mapPartitions
的更多详细信息,请参阅此答案:
初始化"chunkerME"内部scala对象然后广播it.The Scala对象默认是序列化的。编译器对 scala 对象进行序列化。
或者如果在 scala class 中初始化,scala class 需要通过扩展 Serializable 特征来显式序列化。