我怎样才能将我的结构化流数据帧发送到卡夫卡?
How can i send my structured streaming dataframe to kafka?
大家好!
我正在尝试将我的结构化流数据帧发送到我的 kafka 主题之一,detection
。
这是结构化流数据帧的架构:
root
|-- timestamp: timestamp (nullable = true)
|-- Sigma: string (nullable = true)
|-- time: string (nullable = true)
|-- duration: string (nullable = true)
|-- SourceComputer: string (nullable = true)
|-- SourcePort: string (nullable = true)
|-- DestinationComputer: string (nullable = true)
|-- DestinationPort: string (nullable = false)
|-- protocol: string (nullable = true)
|-- packetCount: string (nullable = true)
|-- byteCount: string (nullable = true)
但后来我尝试用这种方法发送数据帧:
dfwriter=df \
.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("checkpointLocation", "/Documents/checkpoint/logs") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("failOnDataLoss", "false") \
.option("topic", detection) \
.start()
然后我得到了错误:
pyspark.sql.utils.AnalysisException:无法解析“value
”给定的输入列:[DestinationComputer、DestinationPort、Sigma、SourceComputer、SourcePort、byteCount、duration、packetCount、processName、protocol,时间,时间戳];第 1 行第 5 位;
如果我发送一个数据框并在列 value
上运行,我会收到关于我的 kafka 主题消费者的数据。
是否可以发送包含所有列的数据框?
谢谢!
如错误所述,您的数据框没有 value
列。
您需要将所有列“嵌入”在 value
StructType
列下,然后使用 to_json
之类的函数,而不是 CAST( .. AS STRING)
在 Pyspark 中,这类似于 select 查询中的 struct(to_json(struct($"*")).as("value")
类似问题 -
大家好!
我正在尝试将我的结构化流数据帧发送到我的 kafka 主题之一,detection
。
这是结构化流数据帧的架构:
root
|-- timestamp: timestamp (nullable = true)
|-- Sigma: string (nullable = true)
|-- time: string (nullable = true)
|-- duration: string (nullable = true)
|-- SourceComputer: string (nullable = true)
|-- SourcePort: string (nullable = true)
|-- DestinationComputer: string (nullable = true)
|-- DestinationPort: string (nullable = false)
|-- protocol: string (nullable = true)
|-- packetCount: string (nullable = true)
|-- byteCount: string (nullable = true)
但后来我尝试用这种方法发送数据帧:
dfwriter=df \
.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("checkpointLocation", "/Documents/checkpoint/logs") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("failOnDataLoss", "false") \
.option("topic", detection) \
.start()
然后我得到了错误:
pyspark.sql.utils.AnalysisException:无法解析“value
”给定的输入列:[DestinationComputer、DestinationPort、Sigma、SourceComputer、SourcePort、byteCount、duration、packetCount、processName、protocol,时间,时间戳];第 1 行第 5 位;
如果我发送一个数据框并在列 value
上运行,我会收到关于我的 kafka 主题消费者的数据。
是否可以发送包含所有列的数据框?
谢谢!
如错误所述,您的数据框没有 value
列。
您需要将所有列“嵌入”在 value
StructType
列下,然后使用 to_json
之类的函数,而不是 CAST( .. AS STRING)
在 Pyspark 中,这类似于 select 查询中的 struct(to_json(struct($"*")).as("value")
类似问题 -