Spark 结构化流写入流到 Hive ORC 分区外部 Table
Spark Structured Streaming Writestream to Hive ORC Partioned External Table
我正在尝试使用 Spark Structured Streaming - writeStream
API 写入外部分区 Hive table。
CREATE EXTERNAL TABLE `XX`(
`a` string,
`b` string,
`b` string,
`happened` timestamp,
`processed` timestamp,
`d` string,
`e` string,
`f` string )
PARTITIONED BY (
`year` int, `month` int, `day` int)
CLUSTERED BY (d)
INTO 6 BUCKETS
STORED AS ORC
TBLPROPERTIES (
'orc.compress'='ZLIB',
'orc.compression.strategy'='SPEED',
'orc.create.index'='true',
'orc.encoding.strategy'='SPEED');
在 Spark 代码中,
val hiveOrcWriter: DataStreamWriter[Row] = event_stream
.writeStream
.outputMode("append")
.format("orc")
.partitionBy("year","month","day")
//.option("compression", "zlib")
.option("path", _table_loc)
.option("checkpointLocation", _table_checkpoint)
我看到在非分区 table 上,记录被插入到 Hive 中。但是,在使用分区 table 时,spark 作业不会失败或引发异常,但不会将记录插入到 Hive table.
感谢任何处理过类似问题的人的评论。
编辑:
刚发现.orc文件确实写入了HDFS,分区目录结构正确:eg。 /_table_loc/_table_name/year/month/day/part-0000-0123123.c000.snappy.orc
不过
select * from 'XX' limit 1; (or where year=2018)
returns 没有行。
InputFormat
和 OutputFormat
对于 Table 'XX' 是 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
和
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
分别
此功能在结构化流式传输中并非开箱即用。在正常处理中,您将使用 dataset.write.saveAsTable(table_name)
,并且该方法不可用。
在 HDFS 中处理和保存数据后,您可以手动更新分区(或使用按计划执行此操作的脚本):
如果您使用 Hive
MSCK REPAIR TABLE table_name
如果你使用Impala
ALTER TABLE table_name RECOVER PARTITIONS
我正在尝试使用 Spark Structured Streaming - writeStream
API 写入外部分区 Hive table。
CREATE EXTERNAL TABLE `XX`(
`a` string,
`b` string,
`b` string,
`happened` timestamp,
`processed` timestamp,
`d` string,
`e` string,
`f` string )
PARTITIONED BY (
`year` int, `month` int, `day` int)
CLUSTERED BY (d)
INTO 6 BUCKETS
STORED AS ORC
TBLPROPERTIES (
'orc.compress'='ZLIB',
'orc.compression.strategy'='SPEED',
'orc.create.index'='true',
'orc.encoding.strategy'='SPEED');
在 Spark 代码中,
val hiveOrcWriter: DataStreamWriter[Row] = event_stream
.writeStream
.outputMode("append")
.format("orc")
.partitionBy("year","month","day")
//.option("compression", "zlib")
.option("path", _table_loc)
.option("checkpointLocation", _table_checkpoint)
我看到在非分区 table 上,记录被插入到 Hive 中。但是,在使用分区 table 时,spark 作业不会失败或引发异常,但不会将记录插入到 Hive table.
感谢任何处理过类似问题的人的评论。
编辑:
刚发现.orc文件确实写入了HDFS,分区目录结构正确:eg。 /_table_loc/_table_name/year/month/day/part-0000-0123123.c000.snappy.orc
不过
select * from 'XX' limit 1; (or where year=2018)
returns 没有行。
InputFormat
和 OutputFormat
对于 Table 'XX' 是 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
和
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
分别
此功能在结构化流式传输中并非开箱即用。在正常处理中,您将使用 dataset.write.saveAsTable(table_name)
,并且该方法不可用。
在 HDFS 中处理和保存数据后,您可以手动更新分区(或使用按计划执行此操作的脚本):
如果您使用 Hive
MSCK REPAIR TABLE table_name
如果你使用Impala
ALTER TABLE table_name RECOVER PARTITIONS