卡夫卡 java.io.EOFException - NetworkReceive.readFromReadableChannel

Kafka java.io.EOFException - NetworkReceive.readFromReadableChannel

我正在尝试从 Apache Spark 2.2.1 Structured Streaming 连接到 IBM Message Hub。

连接代码很基础:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StreamingRetailTransactions").getOrCreate()

import spark.implicits._

val df = spark.readStream.
                format("kafka").
                option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
                option("subscribe", "transactions_load").
                option("security.protocol", "SASL_SSL").
                option("sasl.mechanism", "PLAIN").
                option("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"*****\" password=\"*****\";").
                option("ssl.protocol", "TLSv1.2").
                option("ssl.enabled.protocols", "TLSv1.2").
                option("ssl.endpoint.identification.algorithm", "HTTPS").
                option("auto.offset.reset","earliest").
                option("group.id", System.currentTimeMillis).
                load()

val query = df.writeStream.format("console").start()

我开始 shell 的火花:

~/spark-2.2.1-bin-hadoop2.7$ ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

但是,我收到断开连接错误:

scala> 18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
<<repeats forever>>

我使用 sc.setLogLevel("DEBUG") 增加了调试输出,我得到:

<<log output ommitted for brevity>>
18/01/09 08:38:28 DEBUG SessionState: SessionState user: null
18/01/09 08:38:28 DEBUG SessionState: HDFS root scratch dir: /tmp/hive with schema null, permission: rwx-wx-wx
18/01/09 08:38:28 INFO SessionState: Created local directory: /tmp/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9_resources
18/01/09 08:38:28 INFO SessionState: Created HDFS directory: /tmp/hive/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9
18/01/09 08:38:28 INFO SessionState: Created local directory: /tmp/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9
18/01/09 08:38:28 INFO SessionState: Created HDFS directory: /tmp/hive/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9/_tmp_space.db
18/01/09 08:38:28 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.1) is file:/home/snowch/spark-2.2.1-bin-hadoop2.7/spark-warehouse
18/01/09 08:38:28 DEBUG SessionState: Session is using authorization class class org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider
18/01/09 08:38:28 DEBUG StateStoreCoordinatorRef: Retrieved existing StateStoreCoordinator endpoint
18/01/09 08:38:28 DEBUG StreamExecution: Starting Trigger Calculation
18/01/09 08:38:28 INFO StreamExecution: Starting new streaming query.
18/01/09 08:38:28 DEBUG UserGroupInformation: PrivilegedAction as:snowch (auth:SIMPLE) from:org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
18/01/09 08:38:28 DEBUG KafkaSource$$anon: Unable to find batch /tmp/temporary-fb3e6e1a-fbbe-4098-991c-0b29f63ecade/sources/0/0
18/01/09 08:38:28 DEBUG AbstractCoordinator: Sending coordinator request for group spark-kafka-source-9a14bb54-8f1b-47db-8497-19c083128496--998588290-driver-0 to broker kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093 (id: -5 rack: null)
18/01/09 08:38:28 DEBUG NetworkClient: Initiating connection to node -5 at kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093.
18/01/09 08:38:28 DEBUG NetworkClient: Initialize connection to node -3 for sending metadata request
18/01/09 08:38:28 DEBUG NetworkClient: Initiating connection to node -3 at kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093.
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.bytes-sent
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.bytes-received
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.latency
18/01/09 08:38:28 DEBUG NetworkClient: Completed connection to node -5
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.bytes-sent
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.bytes-received
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.latency
18/01/09 08:38:28 DEBUG NetworkClient: Completed connection to node -3
18/01/09 08:38:28 DEBUG NetworkClient: Sending metadata request {topics=[transactions_load]} to node -5
18/01/09 08:38:28 DEBUG Selector: Connection with kafka05-prod02.messagehub.services.eu-gb.bluemix.net/159.8.179.153 disconnected
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:974)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$$anonfun$apply.apply(KafkaOffsetReader.scala:174)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$$anonfun$apply.apply(KafkaOffsetReader.scala:172)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt.apply$mcV$sp(KafkaOffsetReader.scala:263)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt.apply(KafkaOffsetReader.scala:262)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt.apply(KafkaOffsetReader.scala:262)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:261)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets.apply(KafkaOffsetReader.scala:172)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets.apply(KafkaOffsetReader.scala:172)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:230)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:171)
    at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$initialPartitionOffsets.apply(KafkaSource.scala:132)
    at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$initialPartitionOffsets.apply(KafkaSource.scala:129)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:129)
    at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets(KafkaSource.scala:97)
    at org.apache.spark.sql.kafka010.KafkaSource.getOffset(KafkaSource.scala:163)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$$anonfun$apply.apply(StreamExecution.scala:521)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$$anonfun$apply.apply(StreamExecution.scala:521)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun.apply(StreamExecution.scala:520)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun.apply(StreamExecution.scala:518)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:518)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:492)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun$apply$mcZ$sp.apply$mcV$sp(StreamExecution.scala:297)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun$apply$mcZ$sp.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun$apply$mcZ$sp.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches.apply$mcZ$sp(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:206)
18/01/09 08:38:28 DEBUG NetworkClient: Node -5 disconnected.
18/01/09 08:38:28 WARN NetworkClient: Bootstrap broker kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:38:28 DEBUG ConsumerNetworkClient: Cancelled GROUP_COORDINATOR request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@4eacac4e, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=spark-kafka-source-9a14bb54-8f1b-47db-8497-19c083128496--998588290-driver-0}), createdTimeMs=1515487108479, sendTimeMs=1515487108598) with correlation id 0 due to node -5 being disconnected
18/01/09 08:38:28 DEBUG NetworkClient: Sending metadata request {topics=[transactions_load]} to node -3
18/01/09 08:38:28 DEBUG Selector: Connection with kafka01-prod02.messagehub.services.eu-gb.bluemix.net/159.8.179.149 disconnected
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    <<repeated>>

我看过以下类似问题:

我知道有些输出只是 'noise',但是,我的 spark 流应用程序似乎没有接收到任何数据。我已连接到控制台客户端,并且能够看到数据。


更新 1 - 我已尝试配置 JaaS,但仍然遇到相同的错误。问题可能是 JaaS 代码需要在每个工作节点上 运行,但没有在它们上 运行。

sc.setLogLevel("DEBUG")

def jaasClientConfig(username: String, password: String): Unit = {

    import javax.security.auth.login.AppConfigurationEntry
    import javax.security.auth.login.Configuration
    import javax.security.auth.login.LoginException

    import scala.collection.JavaConversions._

    System.setProperty("java.security.auth.login.config", "")

    Configuration.setConfiguration(new Configuration() {
        def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
            val idMap = Map(
                "serviceName" -> "kafka",
                "username" -> username,
                "password" -> password
                )
            val ace = new AppConfigurationEntry(
               "org.apache.kafka.common.security.plain.PlainLoginModule",
               AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
               idMap
            )
            return Array(ace)
        }
    })
}

def startSparkStreaming(): Unit = {

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.SparkSession

    val spark = SparkSession.builder.appName("StreamingRetailTransactions").getOrCreate()

    import spark.implicits._

    val df = spark.readStream.
                    format("kafka").
                    option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
                    option("subscribe", "transactions_load").
                    option("security.protocol", "SASL_SSL").
                    option("sasl.mechanism", "PLAIN").
                    option("ssl.protocol", "TLSv1.2").
                    option("ssl.enabled.protocols", "TLSv1.2").
                    option("ssl.endpoint.identification.algorithm", "HTTPS").
                    option("auto.offset.reset","earliest").
                    option("group.id", System.currentTimeMillis).
                    load()

    val query = df.writeStream.format("console").start()
}

jaasClientConfig("****","****")

startSparkStreaming()

更新 2

我也试过 jaas.conf:

~/spark-2.2.1-bin-hadoop2.7$ ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.conf=jaas.conf" --files "jaas.conf"

和...

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="*****"
    password="*****";
    }

还是一样的问题...

这似乎是一次失败的身份验证。在当前 MH 版本的 Kafka 中,服务器只是在验证失败时关闭连接

Kafka 客户端从 0.10.2 版本开始使用 "sasl.jaas.config" 设置

Spark 2.2.1 使用的 Kafka 客户端是 0.10.0,因此怀疑身份验证失败。

可以使用java.security.auth.login.config系统属性指定jaas文件

或者,您可以使用如下代码片段以编程方式为客户端设置凭据

public static void jaasClientConfig(final String username, final String password) throws Exception {
    System.setProperty("java.security.auth.login.config", "");

    Configuration.setConfiguration(new Configuration() {
        public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
            HashMap<String, String> idMap = new HashMap<>();
            idMap.put("serviceName", "kafka"); // Seems to be optional
            idMap.put("username", username);
            idMap.put("password", password);

            AppConfigurationEntry ace = new AppConfigurationEntry("org.apache.kafka.common.security.plain.PlainLoginModule",
                    AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, idMap);
            AppConfigurationEntry[] entry = { ace };
            return entry;
        }
    });
}

首先我需要 运行 spark-shell 和 --conf 将执行者和驱动程序指向我的 jaas.conf:

   ./bin/spark-shell --master local[1] \
        --jars external/kafka-0-10-sql/target/spark-sql-kafka-0-10_2.11-2.2.2-SNAPSHOT.jar,external/kafka-0-10-assembly/target/spark-streaming-kafka-0-10-assembly_2.11-2.2.2-SNAPSHOT.jar \
       --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/path/to/jaas.conf" \
       --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/jaas.conf" \
       --num-executors 1  --executor-cores 1 

接下来我必须添加一些 kafka 选项:

val df = spark.readStream.
                format("kafka").
                option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
                option("subscribe", "transactions_load").
                option("kafka.security.protocol", "SASL_SSL").
                option("kafka.sasl.mechanism", "PLAIN").
                option("kafka.ssl.protocol", "TLSv1.2").
                option("kafka.ssl.enabled.protocols", "TLSv1.2").
                load()

注意kafka选项需要加上kafka.前缀,例如:

  • security.protocol => kafka.security.protocol

这些更改为我解决了连接问题。