如何将 StructType 应用于从 Kafka 主题接收数据的数据框?

How to apply a StructType to a dataframe that is receiving data from a Kafka topic?

我正在使用 Spark 将数据写入 Kafka 主题,如下所示。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType, DateType


publish_final_df = spark.read.parquet("s3://some_s3_file_path")
final_df = publish_final_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))

写到主题

final_df.write.format("kafka")\
        .option("kafka.bootstrap.servers", kafka_broker)\
        .option("acks", "1")\
        .option("kafka.compression.type", "snappy")\
        .option("kafka.security.protocol", "SASL_SSL")\
        .option("kafka.sasl.jaas.config", oauth_config)\
        .option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
        .option("kafka.sasl.mechanism", "OAUTHBEARER")\
        .option("topic", 'topic_a')\
        .save()

以上步骤运行成功。要读取同一主题的数据:topic_a 我准备了一个类似于 final_df

的架构
data_struct = StructType([StructField("col1", StringType()),
                               StructField("col2", TimestampType()),
                               StructField("col3", IntegerType())
                               ])

在从同一主题读取数据时:topic_a,我尝试应用上面的结构类型:data_struct 并阅读如下。

阅读主题

initial_df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", kafka_broker)\
    .option("retries", 3)\
    .option("acks", "1")\
    .option("kafka.security.protocol", "SASL_SSL")\
    .option("kafka.sasl.jaas.config", oauth_config)\
    .option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
    .option("kafka.sasl.mechanism", "OAUTHBEARER")\
    .option("subscribe", 'topic_a')\
    .option("startingOffsets", "earliest")\
    .load()\
    .select(from_json(col('value'), data_struct).alias('band')) \
    .selectExpr("band.col1 as col1, band.col2 as col2, band.col3 as col3 *")
initial_df.writeStream.format('console').outputMode('append').start().awaitTermination()

当我运行上面的代码时,我看到一个错误:

Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1287.select.
: org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' due to data type mismatch: argument 1 requires string type, however, '`value`' is of binary type.;;
'Project [jsontostructs(StructField(col1,StringType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true) AS band#883]

我从 Scala 来到 Python 并且代码:

.select(from_json(col('value'), data_struct).as('band')) \
.selectExpr("band.col1 as col1, band.col2 as col2, band.col3 as col3 *")

在 Scala 中正常工作,我在 Python 中应用了相同的方法,但它失败了。谁能告诉我我在这里犯了什么错误,我该如何纠正?

我的错误是在将 StrucType 应用于它之前没有将 Json 转换为字符串。下面一行解决了我的问题。

select(from_json(col('value').cast('string'), data_struct).alias("json_dta")).selectExpr('json_dta.*')