由于文件格式不匹配,PySpark 数据框未保存在 Hive 中

PySpark dataframe not getting saved in Hive due to file format mismatch

我想将 流数据kafka 主题 写入 hive table.
我可以通过阅读 kafka 主题来创建数据帧,但是由于文件格式不匹配,数据没有写入 Hive Table。我已指定 dataframe.format("parquet") 并且配置单元 table 是使用 stored as parquet.
创建的 下面是代码片段:

def hive_write_batch_data(data, batchId):
    data.write.format("parquet").mode("append").saveAsTable(table)

def write_to_hive(data,kafka_sink_name):
    global table
    table = kafka_sink_name
    data.select(col("key"),col("value"),col("offset")) \
        .writeStream.foreachBatch(hive_write_batch_data) \
        .start().awaitTermination()

if __name__ == '__main__':
    kafka_sink_name = sys.argv[1]
    kafka_config = {
                     ....
                     ..
                   }
    spark = SparkSession.builder.appName("Test Streaming").enableHiveSupport().getOrCreate()
    df = spark.readStream \
        .format("kafka") \
        .options(**kafka_config) \
        .load()
    
    df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","offset","timestamp","partition")
    write_to_hive(df1,kafka_sink_name)

Hive table 创建为 Parquet:

CREATE TABLE test.kafka_test(
  key string,
  value string,
  offset bigint)
STORED AS PARQUET;

它给我错误:

pyspark.sql.utils.AnalysisException: "The format of the existing table test.kafka_test is `HiveFileFormat`. It doesn\'t match the specified format `ParquetFileFormat`.;"

如何将数据帧写入配置单元 table?

我放弃了配置单元 table,运行 放弃了 Spark-streaming 工作。 Table 以正确的格式创建。