Flink 从 hbase 读取时出现序列化错误

Flink thowing serialization error when reading from hbase

当我在地图中使用 richfatMapFunction 从 hbase 读取数据时,出现序列化错误。我想要做的是,如果数据流等于从 hbase 读取的特定字符串,则忽略。下面是示例程序和我遇到的错误。

package com.abb.Flinktest
import java.text.SimpleDateFormat
import java.util.Properties

import scala.collection.concurrent.TrieMap 
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.BinaryComparator
import org.apache.hadoop.hbase.filter.CompareFilter
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Level
import org.apache.flink.api.common.functions.RichMapFunction

object Flinktesthbaseread {

  def main(args:Array[String])
  {
   val env = StreamExecutionEnvironment.createLocalEnvironment()
   val kafkaStream = env.fromElements("hello")
   val c=kafkaStream.map(x => if(x.equals("hello"))kafkaStream.flatMap(new ReadHbase()) )       
   env.execute()
  }
      class ReadHbase extends RichFlatMapFunction[String,Tuple11[String,String,String,String,String,String,String,String,String,String,String]] with Serializable
    {
        var conf: org.apache.hadoop.conf.Configuration = null;
    var table: org.apache.hadoop.hbase.client.HTable = null;
    var hbaseconnection:org.apache.hadoop.hbase.client.Connection =null
    var taskNumber: String = null;
    var rowNumber = 0;
    val serialVersionUID = 1L;

    override def open(parameters: org.apache.flink.configuration.Configuration) {
      println("getting table")
       conf = HBaseConfiguration.create()
      val in = getClass().getResourceAsStream("/hbase-site.xml")

      conf.addResource(in)
      hbaseconnection = ConnectionFactory.createConnection(conf)
      table = new HTable(conf, "testtable");
     // this.taskNumber = String.valueOf(taskNumber);
    }

     override def flatMap(msg:String,out:Collector[Tuple11[String,String,String,String,String,String,String,String,String,String,String]]) 
      {
                //flatmap operation here
      }

      override def close() {

      table.flushCommits();
      table.close();
    }

    }
}

错误:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:617)
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:959)
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:484)
    at com.abb.Flinktest.Flinktesthbaseread$.main(Flinktesthbaseread.scala:45)
    at com.abb.Flinktest.Flinktesthbaseread.main(Flinktesthbaseread.scala)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.scala.DataStream
    - field (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun", name: "kafkaStream", type: "class org.apache.flink.streaming.api.scala.DataStream")
    - root object (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun", <function1>)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
    ... 6 more

我尝试通过使 class 可序列化来将字段包装在方法和 class 中,但没有成功。有人可以对此有所了解或为此提出一些解决方法。

问题是您试图访问 map 函数中的 kafka 流变量,该变量根本不可序列化。它只是数据的抽象表示。它不包含任何内容,这首先会使您的函数无效。

相反,做这样的事情:

   kafkaStream.filter(x => x.equals("hello")).flatMap(new ReadHBase())

过滤器函数将只保留条件为真的元素,这些元素将传递给您的 flatMap 函数。

我强烈建议您阅读基础 API 概念文档,因为对于指定转换时实际发生的情况似乎存在一些误解。