生产者正在发布到 Kafka,但无法从 Spark 结构化流中读取
Producer is publishing into Kafka but cannot read from Spark structured streaming
我正在使用 Kafka 发布推文,它运行正常,因为我可以使用以下命令看到回显
bin/kafka-console-consumer.sh --bootstrap-server xxx.xxx.xx.xxx:9092 --topic trump --from-beginning
但是当我尝试使用以下代码使用结构化流时
if __name__ == "__main__":
spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()
source_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xxx.xxx.xx.xxx:9092") \
.option("subscribe", "tweets") \
.option("startingOffsets", "latest") \
.load()
query = source_df \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
然后我得到了一个输出,但它没有在值列下显示推文。相反,我有一个奇怪的字母数字链,如下所示。我在没有截断列值的情况下进行了检查,得到了相同但更长的模式。
+----+--------------------+------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[52 54 20 40 70 7...|tweets| 0| 45724|2021-03-17 12:57:...| 0|
|null|[23 52 57 52 49 2...|tweets| 0| 45725|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 54 7...|tweets| 0| 45726|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 44 6...|tweets| 0| 45727|2021-03-17 12:57:...| 0|
|null|[40 42 42 43 50 6...|tweets| 0| 45728|2021-03-17 12:57:...| 0|
|null|[40 4C 6F 72 64 5...|tweets| 0| 45729|2021-03-17 12:57:...| 0|
|null|[41 6E 6E 6F 75 6...|tweets| 0| 45730|2021-03-17 12:57:...| 0|
|null|[42 69 74 63 6F 6...|tweets| 0| 45731|2021-03-17 12:57:...| 0|
|null|[40 65 72 69 6B 7...|tweets| 0| 45732|2021-03-17 12:57:...| 0|
|null|[74 68 65 20 6D 6...|tweets| 0| 45733|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 6D 6...|tweets| 0| 45734|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 6D 6...|tweets| 0| 45735|2021-03-17 12:57:...| 0|
|null|[40 42 54 43 54 4...|tweets| 0| 45736|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 49 6...|tweets| 0| 45737|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 63 6...|tweets| 0| 45738|2021-03-17 12:57:...| 0|
|null|[42 75 20 6F 6C 6...|tweets| 0| 45739|2021-03-17 12:57:...| 0|
|null|[40 5F 43 72 79 7...|tweets| 0| 45740|2021-03-17 12:57:...| 0|
|null|[40 57 69 6E 66 6...|tweets| 0| 45741|2021-03-17 12:57:...| 0|
|null|[4D 79 20 72 65 6...|tweets| 0| 45742|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 6F 6...|tweets| 0| 45743|2021-03-17 12:57:...| 0|
+----+--------------------+------+---------+------+--------------------+-------------+
only showing top 20 rows
如能帮助了解情况,我们将不胜感激。
默认情况下,存储在 Kafka 中的数据(列 key
和 value
)被序列化为字符串。
查看 Structured Streaming + Kafka Integration Guide 你会看到 key
和 value
两列的类型是 binary
:
另外,指南中提到了如何处理这个问题。您需要将列转换为字符串类型,如下所示:
source_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
我正在使用 Kafka 发布推文,它运行正常,因为我可以使用以下命令看到回显
bin/kafka-console-consumer.sh --bootstrap-server xxx.xxx.xx.xxx:9092 --topic trump --from-beginning
但是当我尝试使用以下代码使用结构化流时
if __name__ == "__main__":
spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()
source_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xxx.xxx.xx.xxx:9092") \
.option("subscribe", "tweets") \
.option("startingOffsets", "latest") \
.load()
query = source_df \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
然后我得到了一个输出,但它没有在值列下显示推文。相反,我有一个奇怪的字母数字链,如下所示。我在没有截断列值的情况下进行了检查,得到了相同但更长的模式。
+----+--------------------+------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[52 54 20 40 70 7...|tweets| 0| 45724|2021-03-17 12:57:...| 0|
|null|[23 52 57 52 49 2...|tweets| 0| 45725|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 54 7...|tweets| 0| 45726|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 44 6...|tweets| 0| 45727|2021-03-17 12:57:...| 0|
|null|[40 42 42 43 50 6...|tweets| 0| 45728|2021-03-17 12:57:...| 0|
|null|[40 4C 6F 72 64 5...|tweets| 0| 45729|2021-03-17 12:57:...| 0|
|null|[41 6E 6E 6F 75 6...|tweets| 0| 45730|2021-03-17 12:57:...| 0|
|null|[42 69 74 63 6F 6...|tweets| 0| 45731|2021-03-17 12:57:...| 0|
|null|[40 65 72 69 6B 7...|tweets| 0| 45732|2021-03-17 12:57:...| 0|
|null|[74 68 65 20 6D 6...|tweets| 0| 45733|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 6D 6...|tweets| 0| 45734|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 6D 6...|tweets| 0| 45735|2021-03-17 12:57:...| 0|
|null|[40 42 54 43 54 4...|tweets| 0| 45736|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 49 6...|tweets| 0| 45737|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 63 6...|tweets| 0| 45738|2021-03-17 12:57:...| 0|
|null|[42 75 20 6F 6C 6...|tweets| 0| 45739|2021-03-17 12:57:...| 0|
|null|[40 5F 43 72 79 7...|tweets| 0| 45740|2021-03-17 12:57:...| 0|
|null|[40 57 69 6E 66 6...|tweets| 0| 45741|2021-03-17 12:57:...| 0|
|null|[4D 79 20 72 65 6...|tweets| 0| 45742|2021-03-17 12:57:...| 0|
|null|[52 54 20 40 6F 6...|tweets| 0| 45743|2021-03-17 12:57:...| 0|
+----+--------------------+------+---------+------+--------------------+-------------+
only showing top 20 rows
如能帮助了解情况,我们将不胜感激。
默认情况下,存储在 Kafka 中的数据(列 key
和 value
)被序列化为字符串。
查看 Structured Streaming + Kafka Integration Guide 你会看到 key
和 value
两列的类型是 binary
:
另外,指南中提到了如何处理这个问题。您需要将列转换为字符串类型,如下所示:
source_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.outputMode("append") \
.format("console") \
.start()