在 Scala 中创建一个 RDD[(ImmutableBytesWritable, Result)]
Creating an RDD[(ImmutableBytesWritable, Result)] in Scala
我正在进行单元测试,我需要创建一个 RDD[(ImmutableBytesWritable, Result)]
。数据仅包含唯一的 id
和非唯一的 value
列。
我可以使用 toDF
从列表创建 DataFrame,并将其转换为 RDD[Row]
;但我无法将其映射到 RDD[(ImmutableBytesWritable, Result)]
.
val values = List((1, 1234), (2, 123), (3, 1234))
import spark.implicits._
val df = values.toDF("id", "value")
val counts : RDD[(ImmutableBytesWritable, Result)] = df.rdd.map(
row => (new ImmutableBytesWritable(), Result.create(...))
)
谢谢!
我不使用 HBase,但考虑到您要构建的签名,我想到了这个。试一试。
我使用 CellUtil 创建了一个单元格来构建结果。
import org.apache.hadoop.hbase.{Cell, CellUtil}
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import scala.math.BigInt
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
object Practice extends App {
val sparkConfig = new SparkConf().setAppName("test").setMaster("local[*]")
val ss = SparkSession.builder().config(sparkConfig).getOrCreate()
val values = Seq((1, 1234), (2, 123), (3, 1234))
import ss.implicits._
val df = values.toDF("id", "value")
val counts: RDD[(ImmutableBytesWritable, Result)] = df.rdd.map{ row =>
val key = row.getAs[Int]("id")
val keyByteArray = BigInt(key).toByteArray
val ibw = new ImmutableBytesWritable()
ibw.set(keyByteArray)
val value = row.getAs[Int]("value")
val valueByteArray = BigInt(value).toByteArray
val cellList = List(CellUtil.createCell(valueByteArray))
val cell: java.util.List[Cell] = ListBuffer(cellList: _*)
val result = Result.create(cell)
(ibw, result)
}
}
打印结果,这并不表示一个好的答案,给你这个:
KeyValue(03,keyvalues={\x04\xD2//LATEST_TIMESTAMP/Maximum/vlen=0/seqid=0})
KeyValue(01,keyvalues={\x04\xD2//LATEST_TIMESTAMP/Maximum/vlen=0/seqid=0})
KeyValue(02,keyvalues={{//LATEST_TIMESTAMP/Maximum/vlen=0/seqid=0})
我正在进行单元测试,我需要创建一个 RDD[(ImmutableBytesWritable, Result)]
。数据仅包含唯一的 id
和非唯一的 value
列。
我可以使用 toDF
从列表创建 DataFrame,并将其转换为 RDD[Row]
;但我无法将其映射到 RDD[(ImmutableBytesWritable, Result)]
.
val values = List((1, 1234), (2, 123), (3, 1234))
import spark.implicits._
val df = values.toDF("id", "value")
val counts : RDD[(ImmutableBytesWritable, Result)] = df.rdd.map(
row => (new ImmutableBytesWritable(), Result.create(...))
)
谢谢!
我不使用 HBase,但考虑到您要构建的签名,我想到了这个。试一试。
我使用 CellUtil 创建了一个单元格来构建结果。
import org.apache.hadoop.hbase.{Cell, CellUtil}
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import scala.math.BigInt
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
object Practice extends App {
val sparkConfig = new SparkConf().setAppName("test").setMaster("local[*]")
val ss = SparkSession.builder().config(sparkConfig).getOrCreate()
val values = Seq((1, 1234), (2, 123), (3, 1234))
import ss.implicits._
val df = values.toDF("id", "value")
val counts: RDD[(ImmutableBytesWritable, Result)] = df.rdd.map{ row =>
val key = row.getAs[Int]("id")
val keyByteArray = BigInt(key).toByteArray
val ibw = new ImmutableBytesWritable()
ibw.set(keyByteArray)
val value = row.getAs[Int]("value")
val valueByteArray = BigInt(value).toByteArray
val cellList = List(CellUtil.createCell(valueByteArray))
val cell: java.util.List[Cell] = ListBuffer(cellList: _*)
val result = Result.create(cell)
(ibw, result)
}
}
打印结果,这并不表示一个好的答案,给你这个:
KeyValue(03,keyvalues={\x04\xD2//LATEST_TIMESTAMP/Maximum/vlen=0/seqid=0})
KeyValue(01,keyvalues={\x04\xD2//LATEST_TIMESTAMP/Maximum/vlen=0/seqid=0})
KeyValue(02,keyvalues={{//LATEST_TIMESTAMP/Maximum/vlen=0/seqid=0})