从 AED S3 存储桶读取时警告导致失败
Warning results in failure when reading from AED S3 bucket
我正在两个 table 之间进行简单的内部连接,但我不断收到如下所示的警告。我在其他帖子中看到,可以忽略警告,但我的工作以失败告终并且没有进展。
table 相当大(120 亿行),但我只从一列 table 添加三列到另一列。
当将数据集减少到几百万行时,运行 Amazon Sagemaker Jupyter notebook 中的脚本。它工作正常。但是当我 运行 它在整个数据集的 EMR 集群上时,它失败了。我什至 运行 它似乎失败的特定 snappy 分区,它在 sagemaker 中工作。
这项工作从 table 之一读取没有问题,似乎是另一个 table 出现问题
INFO FileScanRDD: Reading File path:
s3a://path/EES_FD_UVA_HIST/date=2019-10-14/part-00056-ddb83da5-2e1b-499d-a52a-cad16e21bd2c-c000.snappy.parquet,
range: 0-102777097, partition values: [18183] 20/04/06 15:51:58 WARN
S3AbortableInputStream: Not all bytes were read from the
S3ObjectInputStream, aborting HTTP connection. This is likely an error
and may result in sub-optimal behavior. Request only the bytes you
need via a ranged GET or drain the input stream after use. 20/04/06
15:51:58 WARN S3AbortableInputStream: Not all bytes were read from the
S3ObjectInputStream, aborting HTTP connection. This is likely an error
and may result in sub-optimal behavior. Request only the bytes you
need via a ranged GET or drain the input stream after use. 20/04/06
15:52:03 INFO CoarseGrainedExecutorBackend: Driver commanded a
shutdown 20/04/06 15:52:03 INFO MemoryStore: MemoryStore cleared
20/04/06 15:52:03 INFO BlockManager: BlockManager stopped 20/04/06
15:52:03 INFO ShutdownHookManager: Shutdown hook called
这是我的代码:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
uvalim=spark.read.parquet("s3://path/UVA_HIST_WITH_LIMITS")
uvaorg=spark.read.parquet("s3a://path/EES_FD_UVA_HIST")
config=uvalim.select('SEQ_ID','TOOL_ID', 'DATE' ,'UL','LL')
uva=uvaorg.select('SEQ_ID', 'TOOL_ID', 'TIME_STAMP', 'RUN_ID', 'TARGET', 'LOWER_CRITICAL', 'UPPER_CRITICAL', 'RESULT', 'STATUS')
uva_config=uva.join(config, on=['SEQ_ID','TOOL_ID'], how='inner')
uva_config.write.mode("overwrite").parquet("s3a://path/Uvaconfig.parquet")
有调试方法吗?
更新:根据 Emerson 的建议:
我运行它与调试日志。在我终止 yarn 应用程序之前,它 运行 9 小时失败。
由于某种原因,stderr 没有太多输出
这是标准错误输出:
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found
binding in
[jar:file:/mnt/yarn/usercache/hadoop/filecache/301/__spark_libs__1712836156286367723.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation. SLF4J: Actual binding is of type
[org.slf4j.impl.Log4jLoggerFactory] 20/04/07 05:04:13 INFO
CoarseGrainedExecutorBackend: Started daemon with process name:
5653@ip-10-210-13-51 20/04/07 05:04:13 INFO SignalUtils: Registered
signal handler for TERM 20/04/07 05:04:13 INFO SignalUtils: Registered
signal handler for HUP 20/04/07 05:04:13 INFO SignalUtils: Registered
signal handler for INT 20/04/07 05:04:15 INFO SecurityManager:
Changing view acls to: yarn,hadoop 20/04/07 05:04:15 INFO
SecurityManager: Changing modify acls to: yarn,hadoop 20/04/07
05:04:15 INFO SecurityManager: Changing view acls groups to: 20/04/07
05:04:15 INFO SecurityManager: Changing modify acls groups to:
20/04/07 05:04:15 INFO SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(yarn, hadoop); groups with view permissions: Set();
users with modify permissions: Set(yarn, hadoop); groups with modify
permissions: Set() 20/04/07 05:04:15 INFO TransportClientFactory:
Successfully created connection to
ip-10-210-13-51.ec2.internal/10.210.13.51:35863 after 168 ms (0 ms
spent in bootstraps) 20/04/07 05:04:16 INFO SecurityManager: Changing
view acls to: yarn,hadoop 20/04/07 05:04:16 INFO SecurityManager:
Changing modify acls to: yarn,hadoop 20/04/07 05:04:16 INFO
SecurityManager: Changing view acls groups to: 20/04/07 05:04:16 INFO
SecurityManager: Changing modify acls groups to: 20/04/07 05:04:16
INFO SecurityManager: SecurityManager: authentication disabled; ui
acls disabled; users with view permissions: Set(yarn, hadoop); groups
with view permissions: Set(); users with modify permissions:
Set(yarn, hadoop); groups with modify permissions: Set() 20/04/07
05:04:16 INFO TransportClientFactory: Successfully created connection
to ip-10-210-13-51.ec2.internal/10.210.13.51:35863 after 20 ms (0 ms
spent in bootstraps) 20/04/07 05:04:16 INFO DiskBlockManager: Created
local directory at
/mnt1/yarn/usercache/hadoop/appcache/application_1569338404918_1241/blockmgr-2adfe133-fd28-4f25-95a4-2ac1348c625e
20/04/07 05:04:16 INFO DiskBlockManager: Created local directory at
/mnt/yarn/usercache/hadoop/appcache/application_1569338404918_1241/blockmgr-3620ceea-8eee-42c5-af2f-6975c894b643
20/04/07 05:04:17 INFO MemoryStore: MemoryStore started with capacity
3.8 GB 20/04/07 05:04:17 INFO CoarseGrainedExecutorBackend: Connecting to driver:
spark://CoarseGrainedScheduler@ip-10-210-13-51.ec2.internal:35863
20/04/07 05:04:17 INFO CoarseGrainedExecutorBackend: Successfully
registered with driver 20/04/07 05:04:17 INFO Executor: Starting
executor ID 1 on host ip-10-210-13-51.ec2.internal 20/04/07 05:04:18
INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port
34073. 20/04/07 05:04:18 INFO NettyBlockTransferService: Server created on ip-10-210-13-51.ec2.internal:34073 20/04/07 05:04:18 INFO
BlockManager: Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block
replication policy 20/04/07 05:04:18 INFO BlockManagerMaster:
Registering BlockManager BlockManagerId(1,
ip-10-210-13-51.ec2.internal, 34073, None) 20/04/07 05:04:18 INFO
BlockManagerMaster: Registered BlockManager BlockManagerId(1,
ip-10-210-13-51.ec2.internal, 34073, None) 20/04/07 05:04:18 INFO
BlockManager: external shuffle service port = 7337 20/04/07 05:04:18
INFO BlockManager: Registering executor with local external shuffle
service. 20/04/07 05:04:18 INFO TransportClientFactory: Successfully
created connection to ip-10-210-13-51.ec2.internal/10.210.13.51:7337
after 19 ms (0 ms spent in bootstraps) 20/04/07 05:04:18 INFO
BlockManager: Initialized BlockManager: BlockManagerId(1,
ip-10-210-13-51.ec2.internal, 34073, None) 20/04/07 05:04:20 INFO
CoarseGrainedExecutorBackend: Got assigned task 0 20/04/07 05:04:20
INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 20/04/07 05:04:21
INFO TorrentBroadcast: Started reading broadcast variable 0 20/04/07
05:04:21 INFO TransportClientFactory: Successfully created connection
to ip-10-210-13-51.ec2.internal/10.210.13.51:38181 after 17 ms (0 ms
spent in bootstraps) 20/04/07 05:04:21 INFO MemoryStore: Block
broadcast_0_piece0 stored as bytes in memory (estimated size 39.4 KB,
free 3.8 GB) 20/04/07 05:04:21 INFO TorrentBroadcast: Reading
broadcast variable 0 took 504 ms 20/04/07 05:04:22 INFO MemoryStore:
Block broadcast_0 stored as values in memory (estimated size 130.2 KB,
free 3.8 GB) 20/04/07 05:04:23 INFO CoarseGrainedExecutorBackend:
eagerFSInit: Eagerly initialized FileSystem at s3://does/not/exist in
5155 ms 20/04/07 05:04:25 INFO Executor: Finished task 0.0 in stage
0.0 (TID 0). 53157 bytes result sent to driver 20/04/07 05:04:25 INFO CoarseGrainedExecutorBackend: Got assigned task 2 20/04/07 05:04:25
INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 20/04/07 05:04:25
INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 53114 bytes
result sent to driver 20/04/07 05:04:25 INFO
CoarseGrainedExecutorBackend: Got assigned task 3 20/04/07 05:04:25
INFO Executor: Running task 3.0 in stage 0.0 (TID 3) 20/04/07 05:04:25
ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM 20/04/07
05:04:25 INFO DiskBlockManager: Shutdown hook called 20/04/07 05:04:25
INFO ShutdownHookManager: Shutdown hook called
你能改用 s3 而不是 s3a 吗?我认为不建议在 EMR 中使用 s3a。此外,您可以 运行 调试您的工作。
sc = spark.sparkContext
sc.setLogLevel('DEBUG')
阅读下面关于 s3a 的文档
https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-file-systems.html
所以排查Debug后,我得出的结论是确实是内存的问题。
我使用的集群在加载了几天的数据后 运行 内存不足。每天大约有 20 亿行。
所以我尝试每天解析我的脚本,它似乎能够处理。
然而,当处理一些数据稍大的日子(70 亿行)时,它给了我
executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
错误。 Jumpman 的这个 post 通过简单地扩展 spark.dynamicAllocation.executorIdleTimeout
值
解决了这个问题
非常感谢@Emerson 和@Jumpman!
我正在两个 table 之间进行简单的内部连接,但我不断收到如下所示的警告。我在其他帖子中看到,可以忽略警告,但我的工作以失败告终并且没有进展。 table 相当大(120 亿行),但我只从一列 table 添加三列到另一列。 当将数据集减少到几百万行时,运行 Amazon Sagemaker Jupyter notebook 中的脚本。它工作正常。但是当我 运行 它在整个数据集的 EMR 集群上时,它失败了。我什至 运行 它似乎失败的特定 snappy 分区,它在 sagemaker 中工作。 这项工作从 table 之一读取没有问题,似乎是另一个 table 出现问题
INFO FileScanRDD: Reading File path: s3a://path/EES_FD_UVA_HIST/date=2019-10-14/part-00056-ddb83da5-2e1b-499d-a52a-cad16e21bd2c-c000.snappy.parquet, range: 0-102777097, partition values: [18183] 20/04/06 15:51:58 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use. 20/04/06 15:51:58 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use. 20/04/06 15:52:03 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 20/04/06 15:52:03 INFO MemoryStore: MemoryStore cleared 20/04/06 15:52:03 INFO BlockManager: BlockManager stopped 20/04/06 15:52:03 INFO ShutdownHookManager: Shutdown hook called
这是我的代码:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
uvalim=spark.read.parquet("s3://path/UVA_HIST_WITH_LIMITS")
uvaorg=spark.read.parquet("s3a://path/EES_FD_UVA_HIST")
config=uvalim.select('SEQ_ID','TOOL_ID', 'DATE' ,'UL','LL')
uva=uvaorg.select('SEQ_ID', 'TOOL_ID', 'TIME_STAMP', 'RUN_ID', 'TARGET', 'LOWER_CRITICAL', 'UPPER_CRITICAL', 'RESULT', 'STATUS')
uva_config=uva.join(config, on=['SEQ_ID','TOOL_ID'], how='inner')
uva_config.write.mode("overwrite").parquet("s3a://path/Uvaconfig.parquet")
有调试方法吗?
更新:根据 Emerson 的建议:
我运行它与调试日志。在我终止 yarn 应用程序之前,它 运行 9 小时失败。 由于某种原因,stderr 没有太多输出
这是标准错误输出:
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/filecache/301/__spark_libs__1712836156286367723.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 20/04/07 05:04:13 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 5653@ip-10-210-13-51 20/04/07 05:04:13 INFO SignalUtils: Registered signal handler for TERM 20/04/07 05:04:13 INFO SignalUtils: Registered signal handler for HUP 20/04/07 05:04:13 INFO SignalUtils: Registered signal handler for INT 20/04/07 05:04:15 INFO SecurityManager: Changing view acls to: yarn,hadoop 20/04/07 05:04:15 INFO SecurityManager: Changing modify acls to: yarn,hadoop 20/04/07 05:04:15 INFO SecurityManager: Changing view acls groups to: 20/04/07 05:04:15 INFO SecurityManager: Changing modify acls groups to: 20/04/07 05:04:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set() 20/04/07 05:04:15 INFO TransportClientFactory: Successfully created connection to ip-10-210-13-51.ec2.internal/10.210.13.51:35863 after 168 ms (0 ms spent in bootstraps) 20/04/07 05:04:16 INFO SecurityManager: Changing view acls to: yarn,hadoop 20/04/07 05:04:16 INFO SecurityManager: Changing modify acls to: yarn,hadoop 20/04/07 05:04:16 INFO SecurityManager: Changing view acls groups to: 20/04/07 05:04:16 INFO SecurityManager: Changing modify acls groups to: 20/04/07 05:04:16 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set() 20/04/07 05:04:16 INFO TransportClientFactory: Successfully created connection to ip-10-210-13-51.ec2.internal/10.210.13.51:35863 after 20 ms (0 ms spent in bootstraps) 20/04/07 05:04:16 INFO DiskBlockManager: Created local directory at /mnt1/yarn/usercache/hadoop/appcache/application_1569338404918_1241/blockmgr-2adfe133-fd28-4f25-95a4-2ac1348c625e 20/04/07 05:04:16 INFO DiskBlockManager: Created local directory at /mnt/yarn/usercache/hadoop/appcache/application_1569338404918_1241/blockmgr-3620ceea-8eee-42c5-af2f-6975c894b643 20/04/07 05:04:17 INFO MemoryStore: MemoryStore started with capacity 3.8 GB 20/04/07 05:04:17 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@ip-10-210-13-51.ec2.internal:35863 20/04/07 05:04:17 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 20/04/07 05:04:17 INFO Executor: Starting executor ID 1 on host ip-10-210-13-51.ec2.internal 20/04/07 05:04:18 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34073. 20/04/07 05:04:18 INFO NettyBlockTransferService: Server created on ip-10-210-13-51.ec2.internal:34073 20/04/07 05:04:18 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 20/04/07 05:04:18 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(1, ip-10-210-13-51.ec2.internal, 34073, None) 20/04/07 05:04:18 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(1, ip-10-210-13-51.ec2.internal, 34073, None) 20/04/07 05:04:18 INFO BlockManager: external shuffle service port = 7337 20/04/07 05:04:18 INFO BlockManager: Registering executor with local external shuffle service. 20/04/07 05:04:18 INFO TransportClientFactory: Successfully created connection to ip-10-210-13-51.ec2.internal/10.210.13.51:7337 after 19 ms (0 ms spent in bootstraps) 20/04/07 05:04:18 INFO BlockManager: Initialized BlockManager: BlockManagerId(1, ip-10-210-13-51.ec2.internal, 34073, None) 20/04/07 05:04:20 INFO CoarseGrainedExecutorBackend: Got assigned task 0 20/04/07 05:04:20 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 20/04/07 05:04:21 INFO TorrentBroadcast: Started reading broadcast variable 0 20/04/07 05:04:21 INFO TransportClientFactory: Successfully created connection to ip-10-210-13-51.ec2.internal/10.210.13.51:38181 after 17 ms (0 ms spent in bootstraps) 20/04/07 05:04:21 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 39.4 KB, free 3.8 GB) 20/04/07 05:04:21 INFO TorrentBroadcast: Reading broadcast variable 0 took 504 ms 20/04/07 05:04:22 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 130.2 KB, free 3.8 GB) 20/04/07 05:04:23 INFO CoarseGrainedExecutorBackend: eagerFSInit: Eagerly initialized FileSystem at s3://does/not/exist in 5155 ms 20/04/07 05:04:25 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 53157 bytes result sent to driver 20/04/07 05:04:25 INFO CoarseGrainedExecutorBackend: Got assigned task 2 20/04/07 05:04:25 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 20/04/07 05:04:25 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 53114 bytes result sent to driver 20/04/07 05:04:25 INFO CoarseGrainedExecutorBackend: Got assigned task 3 20/04/07 05:04:25 INFO Executor: Running task 3.0 in stage 0.0 (TID 3) 20/04/07 05:04:25 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM 20/04/07 05:04:25 INFO DiskBlockManager: Shutdown hook called 20/04/07 05:04:25 INFO ShutdownHookManager: Shutdown hook called
你能改用 s3 而不是 s3a 吗?我认为不建议在 EMR 中使用 s3a。此外,您可以 运行 调试您的工作。
sc = spark.sparkContext
sc.setLogLevel('DEBUG')
阅读下面关于 s3a 的文档 https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-file-systems.html
所以排查Debug后,我得出的结论是确实是内存的问题。 我使用的集群在加载了几天的数据后 运行 内存不足。每天大约有 20 亿行。 所以我尝试每天解析我的脚本,它似乎能够处理。 然而,当处理一些数据稍大的日子(70 亿行)时,它给了我
executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
错误。 Jumpman 的这个 post 通过简单地扩展 spark.dynamicAllocation.executorIdleTimeout
值
非常感谢@Emerson 和@Jumpman!