使用 pysam 提供的 class 属性在 Spark 中过滤 RDD

Filter RDD in Spark using class attribute provided by pysam

我正在使用 pysam,一个 python 库,用于读取 Spark 中的 BAM 文件。我创建了一个包含 "BAM" 数据的 RDD。当我尝试 filter 数据时,使用 class AlignedSegment(pysam 库)的属性 query_sequence,然后 spark 崩溃。 运行 data.count()data.first() 工作正常。我是 Spark 的新手。

这是我的代码:

samfile = pysam.AlignmentFile("testfile.bam", "rb")
iter = samfile.fetch("20", until_eof=True)
data = sc.parallelize(iter)

data.count()
data.first()

data.filter(lambda read: 'A' in read.query_sequence).count()

我得到以下输出:

16/05/19 18:07:54 INFO SparkContext: Running Spark version 1.6.1
16/05/19 18:07:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/19 18:07:55 WARN Utils: Your hostname, group13 resolves to a loopback address: 127.0.1.1; using 192.168.1.55 instead (on interface eth0)
16/05/19 18:07:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/05/19 18:07:55 INFO SecurityManager: Changing view acls to: ubuntu
16/05/19 18:07:55 INFO SecurityManager: Changing modify acls to: ubuntu
16/05/19 18:07:55 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ubuntu); users with modify permissions: Set(ubuntu)
16/05/19 18:07:55 INFO Utils: Successfully started service 'sparkDriver' on port 44348.
16/05/19 18:07:56 INFO Slf4jLogger: Slf4jLogger started
16/05/19 18:07:56 INFO Remoting: Starting remoting
16/05/19 18:07:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.55:60148]
16/05/19 18:07:56 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 60148.
16/05/19 18:07:56 INFO SparkEnv: Registering MapOutputTracker
16/05/19 18:07:56 INFO SparkEnv: Registering BlockManagerMaster
16/05/19 18:07:56 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-f2a7e393-0b9c-46fd-8b18-9463f7db8b71
16/05/19 18:07:56 INFO MemoryStore: MemoryStore started with capacity 511.5 MB
16/05/19 18:07:56 INFO SparkEnv: Registering OutputCommitCoordinator
16/05/19 18:07:56 INFO Server: jetty-8.y.z-SNAPSHOT
16/05/19 18:07:56 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/05/19 18:07:56 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/05/19 18:07:56 INFO SparkUI: Started SparkUI at http://192.168.1.55:4040
16/05/19 18:07:57 INFO Utils: Copying /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py to /tmp/spark-e1bbcc0c-b7f8-47d1-b01b-c88556d483c8/userFiles-099a04e4-51dd-4273-bc3e-04b587f3bfb9/readBam.py
16/05/19 18:07:57 INFO SparkContext: Added file file:/home/ubuntu/LDSA-1000-genomes-alexa/readBam.py at file:/home/ubuntu/LDSA-1000-genomes-alexa/readBam.py with timestamp 1463681277070
16/05/19 18:07:57 INFO Executor: Starting executor ID driver on host localhost
16/05/19 18:07:57 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56004.
16/05/19 18:07:57 INFO NettyBlockTransferService: Server created on 56004
16/05/19 18:07:57 INFO BlockManagerMaster: Trying to register BlockManager
16/05/19 18:07:57 INFO BlockManagerMasterEndpoint: Registering block manager localhost:56004 with 511.5 MB RAM, BlockManagerId(driver, localhost, 56004)
16/05/19 18:07:57 INFO BlockManagerMaster: Registered BlockManager
Warning: The index file is older than the data file: LDSA-1000-genomes-alexa/testfile.bam.bai
16/05/19 18:08:57 INFO SparkContext: Starting job: count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:12
16/05/19 18:08:57 INFO DAGScheduler: Got job 0 (count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:12) with 1 output partitions
16/05/19 18:08:57 INFO DAGScheduler: Final stage: ResultStage 0 (count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:12)
16/05/19 18:08:57 INFO DAGScheduler: Parents of final stage: List()
16/05/19 18:08:57 INFO DAGScheduler: Missing parents: List()
16/05/19 18:08:57 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[1] at count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:12), which has no missing parents
16/05/19 18:08:58 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.1 KB, free 4.1 KB)
16/05/19 18:08:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 6.8 KB)
16/05/19 18:08:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:56004 (size: 2.6 KB, free: 511.5 MB)
16/05/19 18:08:58 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/05/19 18:08:58 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (PythonRDD[1] at count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:12)
16/05/19 18:08:58 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/05/19 18:08:58 WARN TaskSetManager: Stage 0 contains a task of very large size (31693 KB). The maximum recommended task size is 100 KB.
16/05/19 18:08:58 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 32453927 bytes)
16/05/19 18:08:58 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/05/19 18:08:58 INFO Executor: Fetching file:/home/ubuntu/LDSA-1000-genomes-alexa/readBam.py with timestamp 1463681277070
16/05/19 18:08:58 INFO Utils: /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py has been previously copied to /tmp/spark-e1bbcc0c-b7f8-47d1-b01b-c88556d483c8/userFiles-099a04e4-51dd-4273-bc3e-04b587f3bfb9/readBam.py
16/05/19 18:09:00 INFO PythonRunner: Times: total = 2030, boot = 202, init = 13, finish = 1815
16/05/19 18:09:00 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 998 bytes result sent to driver
16/05/19 18:09:00 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2502 ms on localhost (1/1)
16/05/19 18:09:00 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/05/19 18:09:00 INFO DAGScheduler: ResultStage 0 (count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:12) finished in 2.522 s
16/05/19 18:09:00 INFO DAGScheduler: Job 0 finished: count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:12, took 2.749181 s
16/05/19 18:09:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393
16/05/19 18:09:00 INFO DAGScheduler: Got job 1 (runJob at PythonRDD.scala:393) with 1 output partitions
16/05/19 18:09:00 INFO DAGScheduler: Final stage: ResultStage 1 (runJob at PythonRDD.scala:393)
16/05/19 18:09:00 INFO DAGScheduler: Parents of final stage: List()
16/05/19 18:09:00 INFO DAGScheduler: Missing parents: List()
16/05/19 18:09:00 INFO DAGScheduler: Submitting ResultStage 1 (PythonRDD[2] at RDD at PythonRDD.scala:43), which has no missing parents
16/05/19 18:09:00 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.2 KB, free 10.0 KB)
16/05/19 18:09:00 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.2 KB, free 12.2 KB)
16/05/19 18:09:00 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:56004 (size: 2.2 KB, free: 511.5 MB)
16/05/19 18:09:00 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/05/19 18:09:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (PythonRDD[2] at RDD at PythonRDD.scala:43)
16/05/19 18:09:00 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/05/19 18:09:00 WARN TaskSetManager: Stage 1 contains a task of very large size (31693 KB). The maximum recommended task size is 100 KB.
16/05/19 18:09:00 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 32453927 bytes)
16/05/19 18:09:00 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/05/19 18:09:00 INFO PythonRunner: Times: total = 2, boot = -352, init = 353, finish = 1
16/05/19 18:09:00 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1037 bytes result sent to driver
16/05/19 18:09:00 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 289 ms on localhost (1/1)
16/05/19 18:09:00 INFO DAGScheduler: ResultStage 1 (runJob at PythonRDD.scala:393) finished in 0.286 s
16/05/19 18:09:00 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/05/19 18:09:00 INFO DAGScheduler: Job 1 finished: runJob at PythonRDD.scala:393, took 0.304149 s
16/05/19 18:09:01 INFO SparkContext: Starting job: count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:15
16/05/19 18:09:01 INFO DAGScheduler: Got job 2 (count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:15) with 1 output partitions
16/05/19 18:09:01 INFO DAGScheduler: Final stage: ResultStage 2 (count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:15)
16/05/19 18:09:01 INFO DAGScheduler: Parents of final stage: List()
16/05/19 18:09:01 INFO DAGScheduler: Missing parents: List()
16/05/19 18:09:01 INFO DAGScheduler: Submitting ResultStage 2 (PythonRDD[3] at count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:15), which has no missing parents
16/05/19 18:09:01 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.5 KB, free 16.7 KB)
16/05/19 18:09:01 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.9 KB, free 19.6 KB)
16/05/19 18:09:01 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:56004 (size: 2.9 KB, free: 511.5 MB)
16/05/19 18:09:01 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
16/05/19 18:09:01 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (PythonRDD[3] at count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:15)
16/05/19 18:09:01 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
16/05/19 18:09:01 WARN TaskSetManager: Stage 2 contains a task of very large size (31693 KB). The maximum recommended task size is 100 KB.
16/05/19 18:09:01 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 32453927 bytes)
16/05/19 18:09:01 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
16/05/19 18:09:01 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:203)
    at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:139)
    ... 11 more
16/05/19 18:09:01 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:203)
    at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:139)
    ... 11 more

16/05/19 18:09:01 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
16/05/19 18:09:01 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
16/05/19 18:09:01 INFO TaskSchedulerImpl: Cancelling stage 2
16/05/19 18:09:01 INFO DAGScheduler: ResultStage 2 (count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:15) failed in 0.451 s
16/05/19 18:09:01 INFO DAGScheduler: Job 2 failed: count at /home/ubuntu/LDSA-1000-genomes-alexa/readBam.py:15, took 0.464958 s
Traceback (most recent call last):
  File "/home/ubuntu/LDSA-1000-genomes-alexa/readBam.py", line 15, in <module>
    data.filter(lambda read: 'A' in read.query_sequence).count()
  File "/home/ubuntu/spark-1.6.1/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count
  File "/home/ubuntu/spark-1.6.1/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum
  File "/home/ubuntu/spark-1.6.1/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold
  File "/home/ubuntu/spark-1.6.1/python/lib/pyspark.zip/pyspark/rdd.py", line 771, in collect
  File "/home/ubuntu/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/home/ubuntu/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:203)
    at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:139)
    ... 11 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
    at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:203)
    at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    ... 1 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:139)
    ... 11 more

16/05/19 18:09:01 INFO SparkContext: Invoking stop() from shutdown hook
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/api,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump/json,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job/json,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/json,null}
16/05/19 18:09:01 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs,null}
16/05/19 18:09:01 INFO SparkUI: Stopped Spark web UI at http://192.168.1.55:4040
16/05/19 18:09:01 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/05/19 18:09:01 INFO MemoryStore: MemoryStore cleared
16/05/19 18:09:01 INFO BlockManager: BlockManager stopped
16/05/19 18:09:01 INFO BlockManagerMaster: BlockManagerMaster stopped
16/05/19 18:09:01 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/05/19 18:09:01 INFO SparkContext: Successfully stopped SparkContext
16/05/19 18:09:01 INFO ShutdownHookManager: Shutdown hook called
16/05/19 18:09:01 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/05/19 18:09:01 INFO ShutdownHookManager: Deleting directory /tmp/spark-e1bbcc0c-b7f8-47d1-b01b-c88556d483c8
16/05/19 18:09:01 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/05/19 18:09:01 INFO ShutdownHookManager: Deleting directory /tmp/spark-e1bbcc0c-b7f8-47d1-b01b-c88556d483c8/pyspark-f61dfd8e-a758-4d1e-9af7-fa2ea4cb771b

Running ... data.first() works fine.

好吧,如果您仔细检查返回的 AlignedSegment 对象,您会发现它没有按预期工作。这里的问题是 pysam 只是 C-API 的薄包装,而 AlignedSegment 对象没有正确序列化/序列化。 Pickle 支持 seems to be on the TODO 列表,但显然还没有。

根据您的要求,您可以直接在执行器上读取数据并限制对本地调用的访问,但它会相当有限,并且无法在单个阶段之外工作。明确地说,我的意思与此类似:

offset = ...  # int
window = ...  # int
reference = ... # str
path = ...  # str, should be accessible on every worker

def fetch(path, reference, offset, window):
    def fetch_(i):
        start  = offset + i * window 
        end    = offset + (i + 1) * window
        return pysam.AlignmentFile(path, "rb").fetch(reference, start, end)
    return fetch_

f = fetch(path, reference, offset, window)

rdd = (sc
    .parallelize([], 10)
    .mapPartitionsWithIndex(lambda i, _: f(i)))

(rdd
    .filter(lambda read: read.query_sequence and 'A' in read.query_sequence)
    .count())