通过 Spark 将 Kafka 消息保存到 HBase 中。会话永不关闭

Save Kafka messages into HBase through Spark. Session never closes

我正在尝试使用 Spark 流从 Kafka 接收消息,将它们转换为 Put 并插入到 HBase 中。 我创建一个 inputDstream 来接收来自 Kafka 的消息,然后创建一个 JobConf,最后使用 saveAsHadoopDataset(JobConf) 将记录保存到 HBase。

每次记录插入到HBase,都会建立一个从Hbase到zookeeper的会话,但不会关闭。如果连接数增加超过 zookeeper 的最大客户端连接数,则 spark streaming 崩溃。

我的代码如下:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder

object ReceiveKafkaAsDstream {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("ReceiveKafkaAsDstream")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val topics = "test"
    val brokers = "10.0.2.15:6667"

    val topicSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)

    val tableName = "KafkaTable"
    val conf = HBaseConfiguration.create()
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

    val jobConfig: JobConf = new JobConf(conf, this.getClass)
    jobConfig.set("mapreduce.output.fileoutputformat", "/user/root/out")
    jobConfig.setOutputFormat(classOf[TableOutputFormat])
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)

      val records = messages
        .map(_._2)
        .map(SampleKafkaRecord.parseToSampleRecord)
      records.print()  
      records.foreachRDD{ stream => stream.map(SampleKafkaRecord.SampleToHbasePut).saveAsHadoopDataset(jobConfig) }

    ssc.start()
    ssc.awaitTermination()
  }

  case class SampleKafkaRecord(id: String, name: String)
  object SampleKafkaRecord extends Serializable {
    def parseToSampleRecord(line: String): SampleKafkaRecord = {
      val values = line.split(";")
      SampleKafkaRecord(values(0), values(1))
    }

    def SampleToHbasePut(CSVData: SampleKafkaRecord): (ImmutableBytesWritable, Put) = {
      val rowKey = CSVData.id
      val putOnce = new Put(rowKey.getBytes)

      putOnce.addColumn("cf1".getBytes, "column-Name".getBytes, CSVData.name.getBytes)
      return (new ImmutableBytesWritable(rowKey.getBytes), putOnce)
    }
  }
}

我在 zookeeper conf 文件中将 SSC (SparkStreamingContext) 的持续时间设置为 1s,并将 maxClientCnxns 设置为 10 zoo.cfg,因此从一个客户端到 zookeeper 最多允许 10 个连接。

10 秒后(从 HBase 到 zookeeper 建立了 10 个会话),我得到如下所示的错误:

16/08/24 14:59:30 WARN RecoverableZooKeeper: Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase-unsecure/hbaseid
16/08/24 14:59:31 INFO ClientCnxn: Opening socket connection to server localhost.localdomain/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
16/08/24 14:59:31 INFO ClientCnxn: Socket connection established to localhost.localdomain/127.0.0.1:2181, initiating session
16/08/24 14:59:31 WARN ClientCnxn: Session 0x0 for server localhost.localdomain/127.0.0.1:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)

据我了解,存在此错误是因为连接数超过了 zookeeper 的最大连接数。如果我将 maxClientCnxn 设置为 20,流式处理可以持续 20 秒。我知道我可以将 maxClientCnxn 设置为无限制,但我真的不认为这是解决这个问题的好方法。

另一件事是,如果我使用 TextFileStream 将文本文件作为 DStream 获取,并使用 saveAsHadoopDataset(jobConf) 将它们保存到 hbase 中,它运行得很好。如果我只是使用 val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 从 kafka 读取数据并简单地打印信息,也没有问题。当我收到 kafka 消息,然后将它们保存到应用程序中的 HBase 时,问题就来了。

我的环境是 HDP 2.4 沙箱。版本火花:1.6,hbase:1.1.2,kafka:2.10.0,动物园管理员:3.4.6.

感谢任何帮助。

嗯,终于搞定了。

  1. 属性集:

有一个属性叫做"zookeeper.connection.timeout.ms"。此属性应设置为 1s。

  1. 更改为新的 API:

将方法 saveAsHadoopDataset(JobConf) 更改为 saveAsNewAPIHadoopDataset(JobConf)。我仍然不知道为什么旧的 API 不起作用。

import org.apache.hadoop.hbase.mapred.TableOutputFormat更改为import org.apache.hadoop.hbase.mapreduce.TableOutputFormat