如何使用 Databricks 在 Apache Spark 上的 PySpark 中编译 While 循环语句
How to Compile a While Loop statement in PySpark on Apache Spark with Databricks
我正在尝试使用 While 循环将数据发送到我的数据湖。
基本上,目的是在使用以下代码从我的 Azure 服务总线接收到数据时不断循环代码并将数据发送到我的数据湖:
此代码从我的服务总线接收消息
def myfunc():
with ServiceBusClient.from_connection_string(CONNECTION_STR) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
for msg in receiver:
# print("Received: " + str(msg))
themsg = json.loads(str(msg))
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
return themsg
此代码为消息分配了一个变量:
result = myfunc()
以下代码将消息发送到我的数据湖
rdd = sc.parallelize([json.dumps(result)])
spark.read.json(rdd) \
.write.mode("overwrite").json('/mnt/lake/RAW/FormulaClassification/F1Area/')
我需要帮助循环遍历代码以持续检查消息并将结果发送到我的数据湖。
我相信解决方案是通过 While 循环完成的,但不确定
仅仅因为您使用的是 Spark 并不意味着您不能循环
首先,您只是从接收方返回第一条消息,因此它应该如下所示
with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
msg = str(next(receiver))
# print("Received: " + msg)
themsg = json.loads(msg)
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
return themsg
回答你的问题,
while True:
result = json.dumps(myfunc())
rdd = sc.parallelize([result])
spark.read.json(rdd) \ # You should use rdd.toDF().json here instead
.write.mode("overwrite").json('/mnt/lake/RAW/FormulaClassification/F1Area/')
请记住,输出文件名不一致,您可能不希望它们被覆盖
或者,您应该考虑编写自己的 Source
/ SparkDataStream
class 来定义 SparkSQL 源,这样您的 main 方法中就不需要循环了,它是原生的由 Spark 处理
我正在尝试使用 While 循环将数据发送到我的数据湖。
基本上,目的是在使用以下代码从我的 Azure 服务总线接收到数据时不断循环代码并将数据发送到我的数据湖:
此代码从我的服务总线接收消息
def myfunc():
with ServiceBusClient.from_connection_string(CONNECTION_STR) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
for msg in receiver:
# print("Received: " + str(msg))
themsg = json.loads(str(msg))
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
return themsg
此代码为消息分配了一个变量:
result = myfunc()
以下代码将消息发送到我的数据湖
rdd = sc.parallelize([json.dumps(result)])
spark.read.json(rdd) \
.write.mode("overwrite").json('/mnt/lake/RAW/FormulaClassification/F1Area/')
我需要帮助循环遍历代码以持续检查消息并将结果发送到我的数据湖。
我相信解决方案是通过 While 循环完成的,但不确定
仅仅因为您使用的是 Spark 并不意味着您不能循环
首先,您只是从接收方返回第一条消息,因此它应该如下所示
with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
msg = str(next(receiver))
# print("Received: " + msg)
themsg = json.loads(msg)
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
return themsg
回答你的问题,
while True:
result = json.dumps(myfunc())
rdd = sc.parallelize([result])
spark.read.json(rdd) \ # You should use rdd.toDF().json here instead
.write.mode("overwrite").json('/mnt/lake/RAW/FormulaClassification/F1Area/')
请记住,输出文件名不一致,您可能不希望它们被覆盖
或者,您应该考虑编写自己的 Source
/ SparkDataStream
class 来定义 SparkSQL 源,这样您的 main 方法中就不需要循环了,它是原生的由 Spark 处理