如何将 StructType 应用于从 Kafka 主题接收数据的数据框?
How to apply a StructType to a dataframe that is receiving data from a Kafka topic?
我正在使用 Spark 将数据写入 Kafka 主题,如下所示。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType, DateType
publish_final_df = spark.read.parquet("s3://some_s3_file_path")
final_df = publish_final_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))
写到主题
final_df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", 'topic_a')\
.save()
以上步骤运行成功。要读取同一主题的数据:topic_a
我准备了一个类似于 final_df
的架构
data_struct = StructType([StructField("col1", StringType()),
StructField("col2", TimestampType()),
StructField("col3", IntegerType())
])
在从同一主题读取数据时:topic_a
,我尝试应用上面的结构类型:data_struct
并阅读如下。
阅读主题
initial_df = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("retries", 3)\
.option("acks", "1")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("subscribe", 'topic_a')\
.option("startingOffsets", "earliest")\
.load()\
.select(from_json(col('value'), data_struct).alias('band')) \
.selectExpr("band.col1 as col1, band.col2 as col2, band.col3 as col3 *")
initial_df.writeStream.format('console').outputMode('append').start().awaitTermination()
当我运行上面的代码时,我看到一个错误:
Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1287.select.
: org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' due to data type mismatch: argument 1 requires string type, however, '`value`' is of binary type.;;
'Project [jsontostructs(StructField(col1,StringType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true) AS band#883]
我从 Scala 来到 Python 并且代码:
.select(from_json(col('value'), data_struct).as('band')) \
.selectExpr("band.col1 as col1, band.col2 as col2, band.col3 as col3 *")
在 Scala 中正常工作,我在 Python 中应用了相同的方法,但它失败了。谁能告诉我我在这里犯了什么错误,我该如何纠正?
我的错误是在将 StrucType 应用于它之前没有将 Json 转换为字符串。下面一行解决了我的问题。
select(from_json(col('value').cast('string'), data_struct).alias("json_dta")).selectExpr('json_dta.*')
我正在使用 Spark 将数据写入 Kafka 主题,如下所示。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType, DateType
publish_final_df = spark.read.parquet("s3://some_s3_file_path")
final_df = publish_final_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))
写到主题
final_df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", 'topic_a')\
.save()
以上步骤运行成功。要读取同一主题的数据:topic_a
我准备了一个类似于 final_df
data_struct = StructType([StructField("col1", StringType()),
StructField("col2", TimestampType()),
StructField("col3", IntegerType())
])
在从同一主题读取数据时:topic_a
,我尝试应用上面的结构类型:data_struct
并阅读如下。
阅读主题
initial_df = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("retries", 3)\
.option("acks", "1")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("subscribe", 'topic_a')\
.option("startingOffsets", "earliest")\
.load()\
.select(from_json(col('value'), data_struct).alias('band')) \
.selectExpr("band.col1 as col1, band.col2 as col2, band.col3 as col3 *")
initial_df.writeStream.format('console').outputMode('append').start().awaitTermination()
当我运行上面的代码时,我看到一个错误:
Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1287.select.
: org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' due to data type mismatch: argument 1 requires string type, however, '`value`' is of binary type.;;
'Project [jsontostructs(StructField(col1,StringType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true) AS band#883]
我从 Scala 来到 Python 并且代码:
.select(from_json(col('value'), data_struct).as('band')) \
.selectExpr("band.col1 as col1, band.col2 as col2, band.col3 as col3 *")
在 Scala 中正常工作,我在 Python 中应用了相同的方法,但它失败了。谁能告诉我我在这里犯了什么错误,我该如何纠正?
我的错误是在将 StrucType 应用于它之前没有将 Json 转换为字符串。下面一行解决了我的问题。
select(from_json(col('value').cast('string'), data_struct).alias("json_dta")).selectExpr('json_dta.*')