pyspark:如何使用 KafkaUtils 执行结构化流式传输
pyspark: how to perform structured streaming using KafkaUtils
我正在使用 SparkSession.readStream
进行结构化流式传输并将其写入配置单元 table,但它似乎不允许我进行基于时间的微批处理,即我需要一批 5 个秒。所有消息都应该形成一个 5 秒的批次,并且批次数据应该写入配置单元 table.
现在它在消息发布到 Kafka 主题时读取消息,每条消息都是 table.
的一条记录
工作代码
def hive_write_batch_data(data, batchId):
data.write.format("parquet").mode("append").saveAsTable("test.my_table")
kafka_config = {
"checkpointLocation":"/user/aiman/temp/checkpoint",
"kafka.bootstrap.servers":"kafka.bootstrap.server.com:9093",
"subscribe":"TEST_TOPIC",
"startingOffsets": offsetValue,
"kafka.security.protocol":"SSL",
"kafka.ssl.keystore.location": "kafka.keystore.uat.jks",
"kafka.ssl.keystore.password": "abcd123",
"kafka.ssl.key.password":"abcd123",
"kafka.ssl.truststore.type":"JKS",
"kafka.ssl.truststore.location": "kafka.truststore.uat.jks",
"kafka.ssl.truststore.password":"abdc123",
"kafka.ssl.enabled.protocols":"TLSv1",
"kafka.ssl.endpoint.identification.algorithm":""
}
df = spark.readStream \
.format("kafka") \
.options(**kafka_config) \
.load()
data = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","offset","timestamp","partition")
data_new = data.select(col("offset"),col("partition"),col("key"),json_tuple(col("value"),"product_code","rec_time")) \
.toDF("offset","partition","key","product_code","rec_time")
data_new.writeStream. \
.foreachBatch(hive_write_batch_data) \
.start() \
.awaitTermination()
问题陈述
由于每条消息都被视为配置单元 table 中的一个记录条目,因此正在为每个记录创建一个 parquet 文件,这可能会触发配置单元的小文件问题。
我需要创建一个基于时间的批处理,以便将多条记录一次性插入配置单元 table。为此,我只发现 KafkaUtils
支持基于时间的使用 ssc = StreamingContext(sc, 5)
但它不会创建 Dataframes.
我应该如何使用 KafkaUtils
创建读入数据帧的批次?
添加触发器有效。在流写入器中添加了一个触发器:
data_new.writeStream \
.trigger(processingTime="5 seconds") \ #Trigger
.foreachBatch(hive_write_batch_data) \
.start() \
.awaitTermination()
找到文章here
我正在使用 SparkSession.readStream
进行结构化流式传输并将其写入配置单元 table,但它似乎不允许我进行基于时间的微批处理,即我需要一批 5 个秒。所有消息都应该形成一个 5 秒的批次,并且批次数据应该写入配置单元 table.
现在它在消息发布到 Kafka 主题时读取消息,每条消息都是 table.
的一条记录
工作代码
def hive_write_batch_data(data, batchId):
data.write.format("parquet").mode("append").saveAsTable("test.my_table")
kafka_config = {
"checkpointLocation":"/user/aiman/temp/checkpoint",
"kafka.bootstrap.servers":"kafka.bootstrap.server.com:9093",
"subscribe":"TEST_TOPIC",
"startingOffsets": offsetValue,
"kafka.security.protocol":"SSL",
"kafka.ssl.keystore.location": "kafka.keystore.uat.jks",
"kafka.ssl.keystore.password": "abcd123",
"kafka.ssl.key.password":"abcd123",
"kafka.ssl.truststore.type":"JKS",
"kafka.ssl.truststore.location": "kafka.truststore.uat.jks",
"kafka.ssl.truststore.password":"abdc123",
"kafka.ssl.enabled.protocols":"TLSv1",
"kafka.ssl.endpoint.identification.algorithm":""
}
df = spark.readStream \
.format("kafka") \
.options(**kafka_config) \
.load()
data = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","offset","timestamp","partition")
data_new = data.select(col("offset"),col("partition"),col("key"),json_tuple(col("value"),"product_code","rec_time")) \
.toDF("offset","partition","key","product_code","rec_time")
data_new.writeStream. \
.foreachBatch(hive_write_batch_data) \
.start() \
.awaitTermination()
问题陈述
由于每条消息都被视为配置单元 table 中的一个记录条目,因此正在为每个记录创建一个 parquet 文件,这可能会触发配置单元的小文件问题。
我需要创建一个基于时间的批处理,以便将多条记录一次性插入配置单元 table。为此,我只发现 KafkaUtils
支持基于时间的使用 ssc = StreamingContext(sc, 5)
但它不会创建 Dataframes.
我应该如何使用 KafkaUtils
创建读入数据帧的批次?
添加触发器有效。在流写入器中添加了一个触发器:
data_new.writeStream \
.trigger(processingTime="5 seconds") \ #Trigger
.foreachBatch(hive_write_batch_data) \
.start() \
.awaitTermination()
找到文章here