如何在 Spark Structured Streaming 中为 Kafka 输出添加 Kafka headers,使它们来自 Dataframe 列?

How to add Kafka headers for a Kafka output in Spark Structured Streaming, making them from Dataframe columns?

由于 Kafka headers 对于 Spark 3.0 的输入和输出只是元组数组的列,这里的问题是如何将列值移动到由列名和当前行组成的元组列的值,然后将其添加到现有数组。

例如,

我有一个数据集,曾经从 Kafka 获得并打算在一些转换后发送到 kafka:

Batch: 0
-------------------------------------------
+---+-----+-----+---------+------+---------+-------------+-------+----------+----------+---------------+--------------+----------+
|key|value|topic|partition|offset|timestamp|timestampType|headers|headersMap|timestamp2|acceptTimestamp|delayWindowEnd|timestamp1|
+---+-----+-----+---------+------+---------+-------------+-------+----------+----------+---------------+--------------+----------+

在 headers 中我们有:

[[Access-Control-Allow-Headers, DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range], [Access-Control-Allow-Methods, GET, POST, OPTIONS], [Access-Control-Allow-Origin, *], [Access-Control-Expose-Headers, Content-Length,Content-Range], [breadcrumbId, ID-ILYA-1644900650793-0-1], [Cache-Control, no-cache], [Connection, keep-alive], [Content-Length, 36362], [Content-Type, application/json], [count,   h], [Date, Tue, 15 Feb 2022 04:51:01 GMT], [kafka.KEY, 60890], [messageId, 60890-1644897084], [Server, adf_http_server/4.3.0205], [Set-Cookie, sessions=21293ca7a63f591ea65771ed2e7fbb5b; path=/;], [time_from,     b#$], [time_to,     b14], [timestamp, b#<], [unit_id,   ��]]

即元组数组

timestamp1 列中我们有

2022-02-20 02:07:32.54

所以我想补充

[timestamp1, 2022-02-20 02:07:32.54]

从 headers 列到数组。

如何将一对列名和值转换为元组?

我必须使用 struct() 创建一个元组列,如果使用预先存在的 headers 元组数组,则使用 array_union()

.withColumn(kafkaTimestampColumnName, col("timestamp"))
.withColumn("tupletime", struct(lit(kafkaTimestampColumnName) as "key", (unix_timestamp(col(kafkaTimestampColumnName), "yyyy-MM-dd hh:mm:ss").cast("string")).cast("binary") as "value"))
.withColumn("headers", array_union(col("headers"), array(col("tupletime"))))

请注意,在元组中,键应该是 string 而值应该是 binary.