structured streaming - 将dataframe逐行写入Kafka,dataframe有一个struct列
structured streaming - writing dataframe into Kafka row by row, dataframe has a struct column
我正在使用 StructuredStreaming ..我有一个 pyspark 数据框,我需要将其写入 Kafka。
数据帧的架构如下所示:
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = false)
| |-- end: timestamp (nullable = false)
|-- processedAlarmCnt: integer (nullable = false)
|-- totalAlarmCnt: integer (nullable = false)
我当前的代码,我正在将 pyspark DataFrame 转换为 pandas,遍历每一行,将数据添加到 hashmap
def writeCountToKafka(df):
if df.count()>0:
hm = {}
df_pandas = df.toPandas()
for _, row in df_pandas.iterrows():
hm["window"] = [datetime.timestamp(row["window"]["start"]),datetime.timestamp(row["window"]["end"])]
hm["processedAlarmCnt"] = row["processedAlarmCnt"]
hm["totalAlarmCnt"] = row["totalAlarmCnt"]
# Python Kafka Producer
kafka_producer.send(topic_count, json.dumps(mymap).encode('utf-8'))
kafka_producer.flush()
几个问题:
如何使此代码更高效 - 可能不必遍历每一行来获取值并将其存储在哈希图中?
使用 StructuredStreaming Kafka Producer 而不是 python KafkaProducer(导入 - 从 kafka 导入 KafkaProducer)有意义吗?
使用 StructuredStreaming kafka 生产者(即,它需要一个“值”,似乎我无法将 window(struct) 转换为值...所以不确定应该将什么作为“值”?
design/code 最好的方法是什么?
蒂亚!
你不需要pandas。 Spark 应该能够完成您转换数据所需的一切。在 Dataframe 行上使用循环几乎总是表明你做错了什么
不,不导入KafkaProducer库;事实上,您不需要安装任何其他 Python 库来生成 Kafka。正如 Spark Structured Streaming 文档中所写,您的数据框需要 仅 包含类型为字节或 str 的 value
列(键/主题/时间戳列都是可选的)。
您需要定义一个接受 Struct 并将三个根列序列化为单个 value
(作为 json 字符串或任何其他类型)的 UDF 函数
我正在使用 StructuredStreaming ..我有一个 pyspark 数据框,我需要将其写入 Kafka。
数据帧的架构如下所示:
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = false)
| |-- end: timestamp (nullable = false)
|-- processedAlarmCnt: integer (nullable = false)
|-- totalAlarmCnt: integer (nullable = false)
我当前的代码,我正在将 pyspark DataFrame 转换为 pandas,遍历每一行,将数据添加到 hashmap
def writeCountToKafka(df):
if df.count()>0:
hm = {}
df_pandas = df.toPandas()
for _, row in df_pandas.iterrows():
hm["window"] = [datetime.timestamp(row["window"]["start"]),datetime.timestamp(row["window"]["end"])]
hm["processedAlarmCnt"] = row["processedAlarmCnt"]
hm["totalAlarmCnt"] = row["totalAlarmCnt"]
# Python Kafka Producer
kafka_producer.send(topic_count, json.dumps(mymap).encode('utf-8'))
kafka_producer.flush()
几个问题:
如何使此代码更高效 - 可能不必遍历每一行来获取值并将其存储在哈希图中?
使用 StructuredStreaming Kafka Producer 而不是 python KafkaProducer(导入 - 从 kafka 导入 KafkaProducer)有意义吗? 使用 StructuredStreaming kafka 生产者(即,它需要一个“值”,似乎我无法将 window(struct) 转换为值...所以不确定应该将什么作为“值”?
design/code 最好的方法是什么?
蒂亚!
你不需要pandas。 Spark 应该能够完成您转换数据所需的一切。在 Dataframe 行上使用循环几乎总是表明你做错了什么
不,不导入KafkaProducer库;事实上,您不需要安装任何其他 Python 库来生成 Kafka。正如 Spark Structured Streaming 文档中所写,您的数据框需要 仅 包含类型为字节或 str 的
value
列(键/主题/时间戳列都是可选的)。
您需要定义一个接受 Struct 并将三个根列序列化为单个 value
(作为 json 字符串或任何其他类型)的 UDF 函数