structured streaming - 将dataframe逐行写入Kafka,dataframe有一个struct列

structured streaming - writing dataframe into Kafka row by row, dataframe has a struct column

我正在使用 StructuredStreaming ..我有一个 pyspark 数据框,我需要将其写入 Kafka。

数据帧的架构如下所示:

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = false)
 |    |-- end: timestamp (nullable = false)
 |-- processedAlarmCnt: integer (nullable = false)
 |-- totalAlarmCnt: integer (nullable = false)

我当前的代码,我正在将 pyspark DataFrame 转换为 pandas,遍历每一行,将数据添加到 hashmap

def writeCountToKafka(df):
       if df.count()>0:
          hm = {}
          df_pandas = df.toPandas()
          for _, row in df_pandas.iterrows():
               hm["window"] = [datetime.timestamp(row["window"]["start"]),datetime.timestamp(row["window"]["end"])]
               hm["processedAlarmCnt"] = row["processedAlarmCnt"]
               hm["totalAlarmCnt"] = row["totalAlarmCnt"]
               
               # Python Kafka Producer
               kafka_producer.send(topic_count, json.dumps(mymap).encode('utf-8'))
                    kafka_producer.flush()

几个问题:

  1. 如何使此代码更高效 - 可能不必遍历每一行来获取值并将其存储在哈希图中?

  2. 使用 StructuredStreaming Kafka Producer 而不是 python KafkaProducer(导入 - 从 kafka 导入 KafkaProducer)有意义吗? 使用 StructuredStreaming kafka 生产者(即,它需要一个“值”,似乎我无法将 window(struct) 转换为值...所以不确定应该将什么作为“值”?

design/code 最好的方法是什么?

蒂亚!

  1. 你不需要pandas。 Spark 应该能够完成您转换数据所需的一切。在 Dataframe 行上使用循环几乎总是表明你做错了什么

  2. 不,不导入KafkaProducer库;事实上,您不需要安装任何其他 Python 库来生成 Kafka。正如 Spark Structured Streaming 文档中所写,您的数据框需要 包含类型为字节或 str 的 value 列(键/主题/时间戳列都是可选的)。

您需要定义一个接受 Struct 并将三个根列序列化为单个 value(作为 json 字符串或任何其他类型)的 UDF 函数