由于文件格式不匹配,PySpark 数据框未保存在 Hive 中
PySpark dataframe not getting saved in Hive due to file format mismatch
我想将 流数据 从 kafka 主题 写入 hive table.
我可以通过阅读 kafka 主题来创建数据帧,但是由于文件格式不匹配,数据没有写入 Hive Table。我已指定 dataframe.format("parquet")
并且配置单元 table 是使用 stored as parquet
.
创建的
下面是代码片段:
def hive_write_batch_data(data, batchId):
data.write.format("parquet").mode("append").saveAsTable(table)
def write_to_hive(data,kafka_sink_name):
global table
table = kafka_sink_name
data.select(col("key"),col("value"),col("offset")) \
.writeStream.foreachBatch(hive_write_batch_data) \
.start().awaitTermination()
if __name__ == '__main__':
kafka_sink_name = sys.argv[1]
kafka_config = {
....
..
}
spark = SparkSession.builder.appName("Test Streaming").enableHiveSupport().getOrCreate()
df = spark.readStream \
.format("kafka") \
.options(**kafka_config) \
.load()
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","offset","timestamp","partition")
write_to_hive(df1,kafka_sink_name)
Hive table 创建为 Parquet:
CREATE TABLE test.kafka_test(
key string,
value string,
offset bigint)
STORED AS PARQUET;
它给我错误:
pyspark.sql.utils.AnalysisException: "The format of the existing table test.kafka_test is `HiveFileFormat`. It doesn\'t match the specified format `ParquetFileFormat`.;"
如何将数据帧写入配置单元 table?
我放弃了配置单元 table,运行 放弃了 Spark-streaming 工作。 Table 以正确的格式创建。
我想将 流数据 从 kafka 主题 写入 hive table.
我可以通过阅读 kafka 主题来创建数据帧,但是由于文件格式不匹配,数据没有写入 Hive Table。我已指定 dataframe.format("parquet")
并且配置单元 table 是使用 stored as parquet
.
创建的
下面是代码片段:
def hive_write_batch_data(data, batchId):
data.write.format("parquet").mode("append").saveAsTable(table)
def write_to_hive(data,kafka_sink_name):
global table
table = kafka_sink_name
data.select(col("key"),col("value"),col("offset")) \
.writeStream.foreachBatch(hive_write_batch_data) \
.start().awaitTermination()
if __name__ == '__main__':
kafka_sink_name = sys.argv[1]
kafka_config = {
....
..
}
spark = SparkSession.builder.appName("Test Streaming").enableHiveSupport().getOrCreate()
df = spark.readStream \
.format("kafka") \
.options(**kafka_config) \
.load()
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","offset","timestamp","partition")
write_to_hive(df1,kafka_sink_name)
Hive table 创建为 Parquet:
CREATE TABLE test.kafka_test(
key string,
value string,
offset bigint)
STORED AS PARQUET;
它给我错误:
pyspark.sql.utils.AnalysisException: "The format of the existing table test.kafka_test is `HiveFileFormat`. It doesn\'t match the specified format `ParquetFileFormat`.;"
如何将数据帧写入配置单元 table?
我放弃了配置单元 table,运行 放弃了 Spark-streaming 工作。 Table 以正确的格式创建。