从 Spark Streaming 写入 HBase

Writing to HBase from Spark Streaming

我在 Spark Streaming(使用 Scala)中有一个由(键,值)对组成的文件输入,如果键满足特定条件,我需要做的是将值存储在 HBase 中。 因为我有 :

val pair: DStream[(String, String)]

我尝试做的是在地图中设置一个条件,并从那里尝试在 HBase 中插入值:

pair.map(x => {
if (x._1 == "condition")
{ val hconf = HBaseConfiguration.create()
val hTable = new HTable(hconf, "mytab")
val thePut = new Put(Bytes.toBytes(1))
thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)
hTable.put(thePut)
})
}

但是这不起作用,我在使用 spark-submit 执行时收到错误消息:没有注册输出操作,所以没有要执行的内容

这是我能想到的将值插入 HBase 的唯一方法,我做错了什么吗?你能帮我解决一下吗?

这是更新后的代码:

pair.foreachRDD(rdd => rdd.map( p =>
{val hconf = HBaseConfiguration.create()
 val hTable = new HTable(hconf,"mytab")
 val thePut = new Put(Bytes.toBytes(1))
 thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes(p._1), Bytes.toBytes(p._2)
 hTable.put(thePut)
})

当我使用 Spark-submit 运行 时,我收到一条错误消息 "Task not serializable",你知道这意味着什么吗?我该如何解决它?

提前致谢

珍,

您收到错误消息的原因是您在代码中缺少对名为 pair 的 RDD 的操作。

参考下面更正后的代码。

  pair.foreachRDD((rdd : [RDD(String,String)]) => {  
 val newRdd = rdd.map(p=> (p._1,p._2))
        if (mewRdd._1 == "condition")
        {
         /* Your code*/
        }})

要从 Spark Streaming 向 hbase 中插入数据,请参阅本文 https://www.mapr.com/blog/spark-streaming-hbase 希望对您有所帮助。