使用PySpark在本地模式下读取文件时出现OutOfMemoryError

OutOfMemoryError when using PySpark to read files in local mode

我有大约十几个 gpg 加密文件,其中包含我想使用 PySpark 分析的数据。我的策略是对每个文件应用解密函数作为平面图,然后在记录级别进行处理:

def read_fun_generator(filename):
    with gpg_open(filename[0].split(':')[-1], 'r') as f:
        for line in f:
            yield line.strip()

gpg_files = sc.wholeTextFiles(/path/to/files/*.gpg)
rdd_from_gpg = gpg_files.flatMap(read_fun_generator).map(lambda x: x.split('|'))
rdd_from_gpg.count()  # <-- For example...

在本地模式下使用单线程时,这种方法非常有效,即将主线程设置为 local[1]。但是,使用超过一个线程会导致抛出 OutOfMemoryError。我试过将 spark.executor.memoryspark.driver.memory 增加到 30g,但这似乎无济于事。我可以在 UI 中确认这些设置已卡住。 (我的机器有超过 200GB 的可用空间。)但是,我在日志中注意到块管理器开始时似乎只有 265.4 MB 的内存。请问这有关系吗?

这是我开始的完整配置:

conf = (SparkConf()
         .setMaster("local[*]")
         .setAppName("pyspark_local")
         .set("spark.executor.memory", "30g")
         .set("spark.driver.memory", "30g")
         .set("spark.python.worker.memory", "5g")
       )
sc = SparkContext(conf=conf)

这是我日志中的堆栈跟踪:

15/06/10 11:03:30 INFO SparkContext: Running Spark version 1.3.1
15/06/10 11:03:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/06/10 11:03:31 INFO SecurityManager: Changing view acls to: santon
15/06/10 11:03:31 INFO SecurityManager: Changing modify acls to: santon
15/06/10 11:03:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(santon); users with modify permissions: Set(santon)
15/06/10 11:03:31 INFO Slf4jLogger: Slf4jLogger started
15/06/10 11:03:31 INFO Remoting: Starting remoting
15/06/10 11:03:32 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:44347]
15/06/10 11:03:32 INFO Utils: Successfully started service 'sparkDriver' on port 44347.
15/06/10 11:03:32 INFO SparkEnv: Registering MapOutputTracker
15/06/10 11:03:32 INFO SparkEnv: Registering BlockManagerMaster
15/06/10 11:03:32 INFO DiskBlockManager: Created local directory at /tmp/spark-24dc8f0a-a89a-44f8-bb95-cd5514e5bf0c/blockmgr-85b6f082-ff5a-4a0e-b48a-1ec62715dda0
15/06/10 11:03:32 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/06/10 11:03:32 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7b2172ed-d658-4e11-bbc1-600697f3255e/httpd-5423f8bc-ec43-48c5-9367-87214dad54f4
15/06/10 11:03:32 INFO HttpServer: Starting HTTP Server
15/06/10 11:03:32 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/10 11:03:32 INFO AbstractConnector: Started SocketConnector@0.0.0.0:50366
15/06/10 11:03:32 INFO Utils: Successfully started service 'HTTP file server' on port 50366.
15/06/10 11:03:32 INFO SparkEnv: Registering OutputCommitCoordinator
15/06/10 11:03:32 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/10 11:03:32 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/06/10 11:03:32 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/06/10 11:03:32 INFO SparkUI: Started SparkUI at localhost:4040
15/06/10 11:03:32 INFO Executor: Starting executor ID <driver> on host localhost
15/06/10 11:03:32 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@localhost:44347/user/HeartbeatReceiver
15/06/10 11:03:33 INFO NettyBlockTransferService: Server created on 46730
15/06/10 11:03:33 INFO BlockManagerMaster: Trying to register BlockManager
15/06/10 11:03:33 INFO BlockManagerMasterActor: Registering block manager localhost:46730 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 46730)
15/06/10 11:03:33 INFO BlockManagerMaster: Registered BlockManager
15/06/10 11:05:19 INFO MemoryStore: ensureFreeSpace(215726) called with curMem=0, maxMem=278302556
15/06/10 11:05:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 210.7 KB, free 265.2 MB)
15/06/10 11:05:19 INFO MemoryStore: ensureFreeSpace(31533) called with curMem=215726, maxMem=278302556
15/06/10 11:05:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 30.8 KB, free 265.2 MB)
15/06/10 11:05:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:46730 (size: 30.8 KB, free: 265.4 MB)
15/06/10 11:05:19 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/06/10 11:05:19 INFO SparkContext: Created broadcast 0 from wholeTextFiles at NativeMethodAccessorImpl.java:-2
15/06/10 11:05:22 INFO FileInputFormat: Total input paths to process : 16
15/06/10 11:05:22 INFO FileInputFormat: Total input paths to process : 16
15/06/10 11:05:22 INFO CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 71665121
15/06/10 11:05:22 INFO SparkContext: Starting job: count at <timed exec>:2
15/06/10 11:05:22 INFO DAGScheduler: Got job 0 (count at <timed exec>:2) with 2 output partitions (allowLocal=false)
15/06/10 11:05:22 INFO DAGScheduler: Final stage: Stage 0(count at <timed exec>:2)
15/06/10 11:05:22 INFO DAGScheduler: Parents of final stage: List()
15/06/10 11:05:22 INFO DAGScheduler: Missing parents: List()
15/06/10 11:05:22 INFO DAGScheduler: Submitting Stage 0 (PythonRDD[1] at count at <timed exec>:2), which has no missing parents
15/06/10 11:05:23 INFO MemoryStore: ensureFreeSpace(6264) called with curMem=247259, maxMem=278302556
15/06/10 11:05:23 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.1 KB, free 265.2 MB)
15/06/10 11:05:23 INFO MemoryStore: ensureFreeSpace(4589) called with curMem=253523, maxMem=278302556
15/06/10 11:05:23 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.5 KB, free 265.2 MB)
15/06/10 11:05:23 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:46730 (size: 4.5 KB, free: 265.4 MB)
15/06/10 11:05:23 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/06/10 11:05:23 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839
15/06/10 11:05:23 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[1] at count at <timed exec>:2)
15/06/10 11:05:23 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/06/10 11:05:23 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1903 bytes)
15/06/10 11:05:23 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 3085 bytes)
15/06/10 11:05:23 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/06/10 11:05:23 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/06/10 11:05:26 INFO WholeTextFileRDD: Input split: Paths:[gpg_files]
15/06/10 11:05:40 ERROR Utils: Uncaught exception in thread stdout writer for /anaconda/python/bin/python
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
    at java.nio.CharBuffer.toString(CharBuffer.java:1201)
    at org.apache.hadoop.io.Text.decode(Text.java:405)
    at org.apache.hadoop.io.Text.decode(Text.java:382)
    at org.apache.hadoop.io.Text.toString(Text.java:280)
    at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:86)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
    at org.apache.spark.rdd.NewHadoopRDD$$anon.hasNext(NewHadoopRDD.scala:143)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:421)
    at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run.apply(PythonRDD.scala:243)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
    at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
Exception in thread "stdout writer for /anaconda/python/bin/python" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
    at java.nio.CharBuffer.toString(CharBuffer.java:1201)
    at org.apache.hadoop.io.Text.decode(Text.java:405)
    at org.apache.hadoop.io.Text.decode(Text.java:382)
    at org.apache.hadoop.io.Text.toString(Text.java:280)
    at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:86)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
    at org.apache.spark.rdd.NewHadoopRDD$$anon.hasNext(NewHadoopRDD.scala:143)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:421)
    at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run.apply(PythonRDD.scala:243)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
    at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
15/06/10 11:05:47 INFO PythonRDD: Times: total = 24140, boot = 2860, init = 664, finish = 20616
15/06/10 11:05:47 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1873 bytes result sent to driver
15/06/10 11:05:47 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 24251 ms on localhost (1/2)

有人 运行 遇到过这个问题吗?是否有我不知道应该修改的设置?看来这应该是可以的...

带sc.wholeTextFiles(/path/to/files/*.gpg)的东西-到returnsPairRDD,key-文件名和值-是文件内容。

看起来你没有使用文件内容部分,但仍然告诉 Spark 从磁盘读取文件并将它们发送给工作人员。

如果您的目标是只处理文件名列表,以及要用 gpg_open 读取的内容,您可以这样做:

def read_fun_generator(filename):
    with gpg_open(filename.split(':')[-1], 'r') as f:
        for line in f:
            yield line.strip()

gpg_filelist = glob.glob("/path/to/files/*.gpg")
# generate RDD with file name per record
gpg_files = sc.parallelize(gpg_filelist)

rdd_from_gpg = gpg_files.flatMap(read_fun_generator).map(lambda x: x.split('|'))
rdd_from_gpg.count()  # <-- For example...

这将减少 Spark 的 JVM 使用的内存量。