从 AED S3 存储桶读取时警告导致失败

Warning results in failure when reading from AED S3 bucket

我正在两个 table 之间进行简单的内部连接,但我不断收到如下所示的警告。我在其他帖子中看到,可以忽略警告,但我的工作以失败告终并且没有进展。 table 相当大(120 亿行),但我只从一列 table 添加三列到另一列。 当将数据集减少到几百万行时,运行 Amazon Sagemaker Jupyter notebook 中的脚本。它工作正常。但是当我 运行 它在整个数据集的 EMR 集群上时,它失败了。我什至 运行 它似乎失败的特定 snappy 分区,它在 sagemaker 中工作。 这项工作从 table 之一读取没有问题,似乎是另一个 table 出现问题

INFO FileScanRDD: Reading File path: s3a://path/EES_FD_UVA_HIST/date=2019-10-14/part-00056-ddb83da5-2e1b-499d-a52a-cad16e21bd2c-c000.snappy.parquet, range: 0-102777097, partition values: [18183] 20/04/06 15:51:58 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use. 20/04/06 15:51:58 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use. 20/04/06 15:52:03 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 20/04/06 15:52:03 INFO MemoryStore: MemoryStore cleared 20/04/06 15:52:03 INFO BlockManager: BlockManager stopped 20/04/06 15:52:03 INFO ShutdownHookManager: Shutdown hook called

这是我的代码:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
uvalim=spark.read.parquet("s3://path/UVA_HIST_WITH_LIMITS")
uvaorg=spark.read.parquet("s3a://path/EES_FD_UVA_HIST")
config=uvalim.select('SEQ_ID','TOOL_ID', 'DATE' ,'UL','LL')
uva=uvaorg.select('SEQ_ID', 'TOOL_ID', 'TIME_STAMP', 'RUN_ID', 'TARGET', 'LOWER_CRITICAL', 'UPPER_CRITICAL', 'RESULT', 'STATUS')

uva_config=uva.join(config, on=['SEQ_ID','TOOL_ID'], how='inner')

uva_config.write.mode("overwrite").parquet("s3a://path/Uvaconfig.parquet")

有调试方法吗?

更新:根据 Emerson 的建议:

我运行它与调试日志。在我终止 yarn 应用程序之前,它 运行 9 小时失败。 由于某种原因,stderr 没有太多输出

这是标准错误输出:

SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/filecache/301/__spark_libs__1712836156286367723.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 20/04/07 05:04:13 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 5653@ip-10-210-13-51 20/04/07 05:04:13 INFO SignalUtils: Registered signal handler for TERM 20/04/07 05:04:13 INFO SignalUtils: Registered signal handler for HUP 20/04/07 05:04:13 INFO SignalUtils: Registered signal handler for INT 20/04/07 05:04:15 INFO SecurityManager: Changing view acls to: yarn,hadoop 20/04/07 05:04:15 INFO SecurityManager: Changing modify acls to: yarn,hadoop 20/04/07 05:04:15 INFO SecurityManager: Changing view acls groups to: 20/04/07 05:04:15 INFO SecurityManager: Changing modify acls groups to: 20/04/07 05:04:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set() 20/04/07 05:04:15 INFO TransportClientFactory: Successfully created connection to ip-10-210-13-51.ec2.internal/10.210.13.51:35863 after 168 ms (0 ms spent in bootstraps) 20/04/07 05:04:16 INFO SecurityManager: Changing view acls to: yarn,hadoop 20/04/07 05:04:16 INFO SecurityManager: Changing modify acls to: yarn,hadoop 20/04/07 05:04:16 INFO SecurityManager: Changing view acls groups to: 20/04/07 05:04:16 INFO SecurityManager: Changing modify acls groups to: 20/04/07 05:04:16 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set() 20/04/07 05:04:16 INFO TransportClientFactory: Successfully created connection to ip-10-210-13-51.ec2.internal/10.210.13.51:35863 after 20 ms (0 ms spent in bootstraps) 20/04/07 05:04:16 INFO DiskBlockManager: Created local directory at /mnt1/yarn/usercache/hadoop/appcache/application_1569338404918_1241/blockmgr-2adfe133-fd28-4f25-95a4-2ac1348c625e 20/04/07 05:04:16 INFO DiskBlockManager: Created local directory at /mnt/yarn/usercache/hadoop/appcache/application_1569338404918_1241/blockmgr-3620ceea-8eee-42c5-af2f-6975c894b643 20/04/07 05:04:17 INFO MemoryStore: MemoryStore started with capacity 3.8 GB 20/04/07 05:04:17 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@ip-10-210-13-51.ec2.internal:35863 20/04/07 05:04:17 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 20/04/07 05:04:17 INFO Executor: Starting executor ID 1 on host ip-10-210-13-51.ec2.internal 20/04/07 05:04:18 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34073. 20/04/07 05:04:18 INFO NettyBlockTransferService: Server created on ip-10-210-13-51.ec2.internal:34073 20/04/07 05:04:18 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 20/04/07 05:04:18 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(1, ip-10-210-13-51.ec2.internal, 34073, None) 20/04/07 05:04:18 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(1, ip-10-210-13-51.ec2.internal, 34073, None) 20/04/07 05:04:18 INFO BlockManager: external shuffle service port = 7337 20/04/07 05:04:18 INFO BlockManager: Registering executor with local external shuffle service. 20/04/07 05:04:18 INFO TransportClientFactory: Successfully created connection to ip-10-210-13-51.ec2.internal/10.210.13.51:7337 after 19 ms (0 ms spent in bootstraps) 20/04/07 05:04:18 INFO BlockManager: Initialized BlockManager: BlockManagerId(1, ip-10-210-13-51.ec2.internal, 34073, None) 20/04/07 05:04:20 INFO CoarseGrainedExecutorBackend: Got assigned task 0 20/04/07 05:04:20 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 20/04/07 05:04:21 INFO TorrentBroadcast: Started reading broadcast variable 0 20/04/07 05:04:21 INFO TransportClientFactory: Successfully created connection to ip-10-210-13-51.ec2.internal/10.210.13.51:38181 after 17 ms (0 ms spent in bootstraps) 20/04/07 05:04:21 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 39.4 KB, free 3.8 GB) 20/04/07 05:04:21 INFO TorrentBroadcast: Reading broadcast variable 0 took 504 ms 20/04/07 05:04:22 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 130.2 KB, free 3.8 GB) 20/04/07 05:04:23 INFO CoarseGrainedExecutorBackend: eagerFSInit: Eagerly initialized FileSystem at s3://does/not/exist in 5155 ms 20/04/07 05:04:25 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 53157 bytes result sent to driver 20/04/07 05:04:25 INFO CoarseGrainedExecutorBackend: Got assigned task 2 20/04/07 05:04:25 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 20/04/07 05:04:25 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 53114 bytes result sent to driver 20/04/07 05:04:25 INFO CoarseGrainedExecutorBackend: Got assigned task 3 20/04/07 05:04:25 INFO Executor: Running task 3.0 in stage 0.0 (TID 3) 20/04/07 05:04:25 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM 20/04/07 05:04:25 INFO DiskBlockManager: Shutdown hook called 20/04/07 05:04:25 INFO ShutdownHookManager: Shutdown hook called

你能改用 s3 而不是 s3a 吗?我认为不建议在 EMR 中使用 s3a。此外,您可以 运行 调试您的工作。

sc = spark.sparkContext
sc.setLogLevel('DEBUG') 

阅读下面关于 s3a 的文档 https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-file-systems.html

所以排查Debug后,我得出的结论是确实是内存的问题。 我使用的集群在加载了几天的数据后 运行 内存不足。每天大约有 20 亿行。 所以我尝试每天解析我的脚本,它似乎能够处理。 然而,当处理一些数据稍大的日子(70 亿行)时,它给了我

executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

错误。 Jumpman 的这个 post 通过简单地扩展 spark.dynamicAllocation.executorIdleTimeout

解决了这个问题

非常感谢@Emerson 和@Jumpman!