Spark Error: I/O error constructing remote block reader. java.nio.channels.ClosedByInterruptException at java.nio.channels.ClosedByInterruptException
Spark Error: I/O error constructing remote block reader. java.nio.channels.ClosedByInterruptException at java.nio.channels.ClosedByInterruptException
在单元测试中本地执行正常,但当 Spark Streaming 执行传播到真正的集群执行器时失败,就像它们无声地崩溃并且不再可用于上下文:
stream execution thread for kafkaDataGeneratorInactiveESP_02/Distance [id = 438f45a0-acd6-4729-953f-5a18ae208f1f, runId = a98c6d39-fe14-4ed5-b7fe-7e4009de51b2]] impl.BlockReaderFactory (BlockReaderFactory.java:getRemoteBlockReaderFromTcp(765)) - I/O error constructing remote block reader.
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:656)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:533)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:2940)
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:822)
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:747)
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:380)
at org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:644)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:575)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:757)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:829)
at java.io.DataInputStream.read(DataInputStream.java:149)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74)
at org.apache.spark.sql.execution.streaming.CommitLog.deserialize(CommitLog.scala:56)
at org.apache.spark.sql.execution.streaming.CommitLog.deserialize(CommitLog.scala:48)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:153)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$getLatest(HDFSMetadataLog.scala:190)
at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:258)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:189)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:300)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:194)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
我尝试的第一件事是更改查询名称,使用斜杠和 space:kafkaDataGeneratorInactiveESP_02/Distance
在
中将其重新调整为正确的
.queryName("kafkaDataGeneratorInactive" + currentIter.metadata.getString("label"))
100% 证明字符串中有 / 或 space,错误没有消失。
失败的原因其实是query name和checkpoint location path重名了(但不是第一次尝试改进的部分)。后来又发现了一个错误日志:
2021-12-01 15:05:46,906 WARN [main] streaming.StreamingQueryManager (Logging.scala:logWarning(69)) - Stopping existing streaming query [id=b13a69d7-5a2f-461e-91a7-a9138c4aa716, runId=9cb31852-d276-42d8-ade6-9839fa97f85c], as a new run is being started.
为什么查询停止了?那是因为在 Scala 中,我在循环中创建流式查询,迭代集合,同时保持所有查询名称和所有检查点名称相同。使它们唯一后(我只是使用集合中的字符串值),失败问题就消失了。
在单元测试中本地执行正常,但当 Spark Streaming 执行传播到真正的集群执行器时失败,就像它们无声地崩溃并且不再可用于上下文:
stream execution thread for kafkaDataGeneratorInactiveESP_02/Distance [id = 438f45a0-acd6-4729-953f-5a18ae208f1f, runId = a98c6d39-fe14-4ed5-b7fe-7e4009de51b2]] impl.BlockReaderFactory (BlockReaderFactory.java:getRemoteBlockReaderFromTcp(765)) - I/O error constructing remote block reader.
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:656)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:533)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:2940)
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:822)
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:747)
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:380)
at org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:644)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:575)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:757)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:829)
at java.io.DataInputStream.read(DataInputStream.java:149)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74)
at org.apache.spark.sql.execution.streaming.CommitLog.deserialize(CommitLog.scala:56)
at org.apache.spark.sql.execution.streaming.CommitLog.deserialize(CommitLog.scala:48)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:153)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$getLatest(HDFSMetadataLog.scala:190)
at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:258)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:189)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:300)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:194)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
我尝试的第一件事是更改查询名称,使用斜杠和 space:kafkaDataGeneratorInactiveESP_02/Distance
在
中将其重新调整为正确的.queryName("kafkaDataGeneratorInactive" + currentIter.metadata.getString("label"))
100% 证明字符串中有 / 或 space,错误没有消失。
失败的原因其实是query name和checkpoint location path重名了(但不是第一次尝试改进的部分)。后来又发现了一个错误日志:
2021-12-01 15:05:46,906 WARN [main] streaming.StreamingQueryManager (Logging.scala:logWarning(69)) - Stopping existing streaming query [id=b13a69d7-5a2f-461e-91a7-a9138c4aa716, runId=9cb31852-d276-42d8-ade6-9839fa97f85c], as a new run is being started.
为什么查询停止了?那是因为在 Scala 中,我在循环中创建流式查询,迭代集合,同时保持所有查询名称和所有检查点名称相同。使它们唯一后(我只是使用集合中的字符串值),失败问题就消失了。