无法将数据集从 SPARK 传输到 HBase table
Cant transfer dataset from SPARK to HBase table
我在将数据从 spark 中的数据帧传输到我创建的 HBase table 时遇到问题。
问题似乎是根据在线示例,此方法只能处理多个数据集,但我有一个包含内部信息的数据框。请帮忙!
//HBasetable
create 'weatherHB', 'STATIONID','OBSERVATIONTS','TEMPERATURE'
def catalog = s"""{
"table":{"namespace":"default", "name":"weatherHB"},
"rowkey":"key",
"columns":{
"RecordID":{"cf":"RecordID","col":"key","type":"string"},
"StationID":{"cf":"STATIONID","col":"stationID","type":"string"},
"ObservationTSMonth":{"cf":"OBSERVATIONTS","col":"observationTSMonth","type":"string"},
"ObservationTSDay":{"cf":"OBSERVATIONTS","col":"observationTSDay","type":"string"},
"ObservationTSHour":{"cf":"OBSERVATIONTS","col":"observationTSHour","type":"string"},
"Temperature":{"cf":"TEMPERATURE","col":"temp","type":"string"}
}
}""".stripMargin
case class TempHeader(
recordId: String,
station: String,
month: String,
date: String,
hour: String,
temperature: Double)
import spark.implicits._
val weatherDF = spark.sparkContext.textFile("1902").
map(
rec => List (
rec.substring(1,26).trim(),
rec.substring(4,10).trim(),
rec.substring(19,21).trim(),
rec.substring(21,23).trim(),
rec.substring(23,25).trim(),
rec.substring(87,92).trim()
) ).
map( att => TempHeader( att(0), att(1), att(2), att(3), att(4), (att(5).trim.toDouble)/10)).toDF()
weatherDF.printSchema()
weatherDF.createOrReplaceTempView("TEMP")
val query = spark.sql("""SELECT month, max(temperature), min(temperature), avg(temperature) FROM TEMP GROUP BY month ORDER by month""".stripMargin)
query.show(10)
import org.apache.spark.sql.execution.datasources.hbase._
weatherDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "6")).format(
"org.apache.spark.sql.execution.datasources.hbase").save()
看起来你的 catalog
有一些问题:
RecordID
的列族应该是rowkey
temp
列的数据类型应该是double
Spark Hbase 连接器的 documentation 中给出了一个正确的示例。
你的目录应该是这样的(我没有测试过):
def catalog = s"""{
|"table":{"namespace":"default", "name":"weatherHB"},
|"rowkey":"key",
|"columns":{
|"key":{"cf":"rowkey","col":"key","type":"string"},
|"stationid":{"cf":"STATIONID","col":"stationID","type":"string"},
|"month":{"cf":"OBSERVATIONTS","col":"observationTSMonth","type":"string"},
|"date":{"cf":"OBSERVATIONTS","col":"observationTSDay","type":"string"},
|"hour":{"cf":"OBSERVATIONTS","col":"observationTSHour","type":"string"},
|"temperature":{"cf":"TEMPERATURE","col":"temp","type":"double"}
|}
|}""".stripMargin
此外,您的 case class
应与此目录匹配:
case class TempHeader(
key: String,
stationid: String,
month: String,
date: String,
hour: String,
temperature: Double)
我在将数据从 spark 中的数据帧传输到我创建的 HBase table 时遇到问题。 问题似乎是根据在线示例,此方法只能处理多个数据集,但我有一个包含内部信息的数据框。请帮忙!
//HBasetable
create 'weatherHB', 'STATIONID','OBSERVATIONTS','TEMPERATURE'
def catalog = s"""{
"table":{"namespace":"default", "name":"weatherHB"},
"rowkey":"key",
"columns":{
"RecordID":{"cf":"RecordID","col":"key","type":"string"},
"StationID":{"cf":"STATIONID","col":"stationID","type":"string"},
"ObservationTSMonth":{"cf":"OBSERVATIONTS","col":"observationTSMonth","type":"string"},
"ObservationTSDay":{"cf":"OBSERVATIONTS","col":"observationTSDay","type":"string"},
"ObservationTSHour":{"cf":"OBSERVATIONTS","col":"observationTSHour","type":"string"},
"Temperature":{"cf":"TEMPERATURE","col":"temp","type":"string"}
}
}""".stripMargin
case class TempHeader(
recordId: String,
station: String,
month: String,
date: String,
hour: String,
temperature: Double)
import spark.implicits._
val weatherDF = spark.sparkContext.textFile("1902").
map(
rec => List (
rec.substring(1,26).trim(),
rec.substring(4,10).trim(),
rec.substring(19,21).trim(),
rec.substring(21,23).trim(),
rec.substring(23,25).trim(),
rec.substring(87,92).trim()
) ).
map( att => TempHeader( att(0), att(1), att(2), att(3), att(4), (att(5).trim.toDouble)/10)).toDF()
weatherDF.printSchema()
weatherDF.createOrReplaceTempView("TEMP")
val query = spark.sql("""SELECT month, max(temperature), min(temperature), avg(temperature) FROM TEMP GROUP BY month ORDER by month""".stripMargin)
query.show(10)
import org.apache.spark.sql.execution.datasources.hbase._
weatherDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "6")).format(
"org.apache.spark.sql.execution.datasources.hbase").save()
看起来你的 catalog
有一些问题:
RecordID
的列族应该是rowkey
temp
列的数据类型应该是double
Spark Hbase 连接器的 documentation 中给出了一个正确的示例。
你的目录应该是这样的(我没有测试过):
def catalog = s"""{
|"table":{"namespace":"default", "name":"weatherHB"},
|"rowkey":"key",
|"columns":{
|"key":{"cf":"rowkey","col":"key","type":"string"},
|"stationid":{"cf":"STATIONID","col":"stationID","type":"string"},
|"month":{"cf":"OBSERVATIONTS","col":"observationTSMonth","type":"string"},
|"date":{"cf":"OBSERVATIONTS","col":"observationTSDay","type":"string"},
|"hour":{"cf":"OBSERVATIONTS","col":"observationTSHour","type":"string"},
|"temperature":{"cf":"TEMPERATURE","col":"temp","type":"double"}
|}
|}""".stripMargin
此外,您的 case class
应与此目录匹配:
case class TempHeader(
key: String,
stationid: String,
month: String,
date: String,
hour: String,
temperature: Double)