如何在 Spark Structured Streaming 中为 Kafka 输出添加 Kafka headers,使它们来自 Dataframe 列?
How to add Kafka headers for a Kafka output in Spark Structured Streaming, making them from Dataframe columns?
由于 Kafka headers 对于 Spark 3.0 的输入和输出只是元组数组的列,这里的问题是如何将列值移动到由列名和当前行组成的元组列的值,然后将其添加到现有数组。
例如,
我有一个数据集,曾经从 Kafka 获得并打算在一些转换后发送到 kafka:
Batch: 0
-------------------------------------------
+---+-----+-----+---------+------+---------+-------------+-------+----------+----------+---------------+--------------+----------+
|key|value|topic|partition|offset|timestamp|timestampType|headers|headersMap|timestamp2|acceptTimestamp|delayWindowEnd|timestamp1|
+---+-----+-----+---------+------+---------+-------------+-------+----------+----------+---------------+--------------+----------+
在 headers 中我们有:
[[Access-Control-Allow-Headers, DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range], [Access-Control-Allow-Methods, GET, POST, OPTIONS], [Access-Control-Allow-Origin, *], [Access-Control-Expose-Headers, Content-Length,Content-Range], [breadcrumbId, ID-ILYA-1644900650793-0-1], [Cache-Control, no-cache], [Connection, keep-alive], [Content-Length, 36362], [Content-Type, application/json], [count, h], [Date, Tue, 15 Feb 2022 04:51:01 GMT], [kafka.KEY, 60890], [messageId, 60890-1644897084], [Server, adf_http_server/4.3.0205], [Set-Cookie, sessions=21293ca7a63f591ea65771ed2e7fbb5b; path=/;], [time_from, b#$], [time_to, b14], [timestamp, b#<], [unit_id, ��]]
即元组数组
在 timestamp1
列中我们有
2022-02-20 02:07:32.54
所以我想补充
[timestamp1, 2022-02-20 02:07:32.54]
从 headers 列到数组。
如何将一对列名和值转换为元组?
我必须使用 struct()
创建一个元组列,如果使用预先存在的 headers 元组数组,则使用 array_union()
。
.withColumn(kafkaTimestampColumnName, col("timestamp"))
.withColumn("tupletime", struct(lit(kafkaTimestampColumnName) as "key", (unix_timestamp(col(kafkaTimestampColumnName), "yyyy-MM-dd hh:mm:ss").cast("string")).cast("binary") as "value"))
.withColumn("headers", array_union(col("headers"), array(col("tupletime"))))
请注意,在元组中,键应该是 string
而值应该是 binary
.
由于 Kafka headers 对于 Spark 3.0 的输入和输出只是元组数组的列,这里的问题是如何将列值移动到由列名和当前行组成的元组列的值,然后将其添加到现有数组。
例如,
我有一个数据集,曾经从 Kafka 获得并打算在一些转换后发送到 kafka:
Batch: 0
-------------------------------------------
+---+-----+-----+---------+------+---------+-------------+-------+----------+----------+---------------+--------------+----------+
|key|value|topic|partition|offset|timestamp|timestampType|headers|headersMap|timestamp2|acceptTimestamp|delayWindowEnd|timestamp1|
+---+-----+-----+---------+------+---------+-------------+-------+----------+----------+---------------+--------------+----------+
在 headers 中我们有:
[[Access-Control-Allow-Headers, DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range], [Access-Control-Allow-Methods, GET, POST, OPTIONS], [Access-Control-Allow-Origin, *], [Access-Control-Expose-Headers, Content-Length,Content-Range], [breadcrumbId, ID-ILYA-1644900650793-0-1], [Cache-Control, no-cache], [Connection, keep-alive], [Content-Length, 36362], [Content-Type, application/json], [count, h], [Date, Tue, 15 Feb 2022 04:51:01 GMT], [kafka.KEY, 60890], [messageId, 60890-1644897084], [Server, adf_http_server/4.3.0205], [Set-Cookie, sessions=21293ca7a63f591ea65771ed2e7fbb5b; path=/;], [time_from, b#$], [time_to, b14], [timestamp, b#<], [unit_id, ��]]
即元组数组
在 timestamp1
列中我们有
2022-02-20 02:07:32.54
所以我想补充
[timestamp1, 2022-02-20 02:07:32.54]
从 headers 列到数组。
如何将一对列名和值转换为元组?
我必须使用 struct()
创建一个元组列,如果使用预先存在的 headers 元组数组,则使用 array_union()
。
.withColumn(kafkaTimestampColumnName, col("timestamp"))
.withColumn("tupletime", struct(lit(kafkaTimestampColumnName) as "key", (unix_timestamp(col(kafkaTimestampColumnName), "yyyy-MM-dd hh:mm:ss").cast("string")).cast("binary") as "value"))
.withColumn("headers", array_union(col("headers"), array(col("tupletime"))))
请注意,在元组中,键应该是 string
而值应该是 binary
.