从 scala 客户端向启用 Kerberos 的 Kafka 集群发送数据

Sending Data to Kerberos Enabled Kafka cluster from scala Client

我正在用 scala 编写 akka,Kafka Producer,我正在尝试从 scala kafka 客户端向 kafka broker 发送消息,问题是 broker 没有收到这些消息我通过从命令行启动 kafka consumer 来验证它。 Kafka 生产者和消费者在命令提示符下工作正常。 Kafka 是 Kerberos 并且 SASL_PlainText 启用了安全性。

请在下面找到我的配置文件、客户端代码和应用程​​序日志。我认为从代码连接到 Kerberos 时一定有问题。

Scala 客户端:

package com.ABC.adds.producer

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import com.ABC.adds.models.Models.GMMOfaq
import com.ABC.adds.producer.serializer.ModelSerializer
import com.thoughtworks.xstream.XStream
import com.thoughtworks.xstream.io.xml.DomDriver
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.ByteArraySerializer

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

object faqProducer extends App with LazyLogging{

  val config = ConfigFactory.load()
  implicit val system = ActorSystem.create("adds-faq-producer", config)
  implicit val mat = ActorMaterializer()

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ModelSerializer[PPOfaq](classOf[PPOfaq]))
  .withBootstrapServers("jbt12324.systems.pfk.ABC:3382")
     .withProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
      .withProperty("zookeeper.connect","jbt234234.systems.pfk.ABC:2341,jbt234.systems.pfk.ABC:234,jbt1234324.systems.pfk.ABC:2234")
      .withProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "1")

  val xstream = new XStream(new DomDriver)
  val personString: String = scala.io.Source.fromInputStream(getClass().getClassLoader().getResourceAsStream("PPOfaq.xml")).mkString
  xstream.alias("faq", classOf[PPOfaq])
  val ppofaq: PPOfaq = xstream.fromXML(personString).asInstanceOf[PPOfaq]

  logger.info(s"Producer Configuration is : {} ", producerSettings.toString)
  logger.info(s"Sending message : {}", ppofaq)

  logger.info("KafkaProducer Send first fetching Partitions for topics")
  val  kafkaProducer  = producerSettings.createKafkaProducer()
  kafkaProducer.partitionsFor("asp.adds.ppo.pems")
  val done1 = kafkaProducer.send(new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq))
  val recordMetaData : RecordMetadata = done1.get()

    logger.info("Topic is :  " + recordMetaData.topic() +" partition is : "+ recordMetaData.partition() +" offset is : "+ recordMetaData.offset() )

  logger.info("KafkaProdcuer Send first fetching Partitions for topics end")

  val done = Source.single(ppofaq)
    .map { elem =>
      new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq)
    }
    .runWith(Producer.plainSink(producerSettings))

  done onComplete {
  case Success(s) => {
    logger.info(s"The producer has sent a message to the topic: asp.adds.ppo.pems!!")
  }
  case Failure(e) => {
    logger.error("Erorr occured while producing Topic", e)
    e.printStackTrace()
    e.fillInStackTrace()
    e.getCause
    e.getMessage
  }
}
}

这是我用于 Kerberos 身份验证的 Kafka_Client Conf 文件:

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=false
useKeyTab=true
serviceName="kafka"
principal="pqr@ASND.ADROOT.ABC"
keyTab="/home/pqr/.pqr.headless.keytab"
debug=true
client=true;
};
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  doNotPrompt=true
  useKeyTab=true
  useTicketCache=false
  serviceName="zookeeper"
  principal="pqr@ASND.ADROOT.ABC"
  keyTab="/home/pqr/.pqr.headless.keytab"
  debug=true;
};

这是我在集群上 运行 我的 jar 时得到的应用程序日志: 应用程序日志:

 [asd-sasds-addsk@jbt32423 ResourceBundle]$ java -Djava.security.auth.login.config=kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -Djava.security.debug=logincontext,gssloginconfig,configfile,configparser, -jar adds-producer.jar
            [Policy Parser]: creating policy entry for expanded java.ext.dirs path:
                    file:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/lib/ext/*
            [Policy Parser]: creating policy entry for expanded java.ext.dirs path:
                    file:/usr/java/packages/lib/ext/*
    14:44:56.520 [main] INFO  c.h.adds.producer.addsProducer$ - Producer Configuration is : akka.kafka.ProducerSettings@35432107
    14:44:56.523 [main] INFO  c.h.adds.producer.addsProducer$ - Sending message : PPOadds(PIC_EVENT,01/06/2016,26/10/2016,ASSIGNED,asd_asdasd_ERRORED,asd,asdMSR,High,CASE-3,CASE-4,CASE-1,CASE-2,,CustomAttributes(GCAL,PTS Only,,DTF_INT_WORKFLOWS_COMPLETE,16065763,2495921,12,CreditDefaultSwap,CreditDefaultSwap,VERIFIED_asdasd,ABCUSA,asdCDS))
    14:44:56.524 [main] INFO  c.h.adds.producer.addsProducer$ - KafkaProducer Send first fetching Partitions for topics
    configfile: reading file:/home/asdasd-asdasdds-adds-asda/adasd-erer/hfgh/kafka_client_jaas.conf
    configparser:   Reading next config entry: KafkaClient
    configparser:           com.sun.security.auth.module.Krb5LoginModule, REQUIRED
    configparser:                   principal=234-dfgd-adds-asd@asdad.ADROOT.ABC
    configparser:                   debug=true
    configparser:                   doNotPrompt=true
    configparser:                   keyTab=/home/asdasd-asdad-adds-rrewr/.sdfsf-sdfsd-adds-sdfsf.headless.keytab
    configparser:                   client=true
    configparser:                   useKeyTab=true
    configparser:                   useTicketCache=false
    configparser:                   serviceName=kafka
    configparser:   Reading next config entry: Client
    configparser:           com.sun.security.auth.module.Krb5LoginModule, REQUIRED
    configparser:                   principal=sdfsf-dsf-adds-uk@sfdsdf.ADROOT.ABC
    configparser:                   debug=true
    configparser:                   doNotPrompt=true
    configparser:                   keyTab=/home/sdfdsf-sfds-adds-sdf/.sdff.sdsfs-adds-usdfs.headless.keytab
    configparser:                   useKeyTab=true
    configparser:                   useTicketCache=false
    configparser:                   serviceName=zookeeper
    Debug is  true storeKey false useTicketCache false useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /home/dasda-sasd-adds-asdad/.asdad-asd-adds-adsasd.headless.keytab refreshKrb5Config is false principal is dhsjdf-ssdf-adds-usdff@sdfs.ADROOT.ABC tryFirstPass is false useFirstPass is false storePass is false clearPass is false
    principal is usdfsf-ss-adds-ufsdf@sdfs.ADROOT.ABC
    Will use keytab
            [LoginContext]: login success
    Commit Succeeded

            [LoginContext]: commit success
    14:44:56.748 [main] WARN  o.a.k.c.producer.ProducerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config.
    adds in thread "main" org.apache.kafka.common.errors.Timeoutadds: Failed to update metadata after 60000 ms.

如果我做错了什么,请告诉我。 谢谢, 马亨德拉·托纳佩

我们无法在我们的集群上使用来自消费者端的消息,但我们能够在我们的本地机器上使用消息,这是因为我们使用 Kafka 0.10 编写了我们的应用程序 api 并且我们的集群有kafka 版本 0.9.If 您检查了这两个 Kafka 版本之间的差异,您会发现这两个版本 API 之间存在显着差异。

另外请启用 Kerberos 调试日志以检查用户是否已通过启用 Kerberos 的集群进行身份验证