在 Scala 中创建一个 RDD[(ImmutableBytesWritable, Result)]

Creating an RDD[(ImmutableBytesWritable, Result)] in Scala

我正在进行单元测试,我需要创建一个 RDD[(ImmutableBytesWritable, Result)]。数据仅包含唯一的 id 和非唯一的 value 列。

我可以使用 toDF 从列表创建 DataFrame,并将其转换为 RDD[Row];但我无法将其映射到 RDD[(ImmutableBytesWritable, Result)].

val values = List((1, 1234), (2, 123), (3, 1234))
import spark.implicits._
val df = values.toDF("id", "value")
val counts : RDD[(ImmutableBytesWritable, Result)] = df.rdd.map(
  row => (new ImmutableBytesWritable(), Result.create(...))
)

谢谢!

我不使用 HBase,但考虑到您要构建的签名,我想到了这个。试一试。

我使用 CellUtil 创建了一个单元格来构建结果。

import org.apache.hadoop.hbase.{Cell, CellUtil}

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer

import scala.math.BigInt

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._

import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

object Practice extends App {

  val sparkConfig = new SparkConf().setAppName("test").setMaster("local[*]")
  val ss = SparkSession.builder().config(sparkConfig).getOrCreate()
  val values = Seq((1, 1234), (2, 123), (3, 1234))

  import ss.implicits._

  val df = values.toDF("id", "value")
  val counts: RDD[(ImmutableBytesWritable, Result)] = df.rdd.map{ row =>
    val key = row.getAs[Int]("id")
    val keyByteArray = BigInt(key).toByteArray
    val ibw = new ImmutableBytesWritable()
    ibw.set(keyByteArray)

    val value = row.getAs[Int]("value")
    val valueByteArray = BigInt(value).toByteArray
    val cellList = List(CellUtil.createCell(valueByteArray))
    val cell: java.util.List[Cell] = ListBuffer(cellList: _*)
    val result = Result.create(cell)

    (ibw, result)

  }
}

打印结果,这并不表示一个好的答案,给你这个:

KeyValue(03,keyvalues={\x04\xD2//LATEST_TIMESTAMP/Maximum/vlen=0/seqid=0})
KeyValue(01,keyvalues={\x04\xD2//LATEST_TIMESTAMP/Maximum/vlen=0/seqid=0})
KeyValue(02,keyvalues={{//LATEST_TIMESTAMP/Maximum/vlen=0/seqid=0})