您似乎正试图从广播中引用 SparkContext
It appears that you are attempting to reference SparkContext from a broadcast
我正在尝试使用 Spark Structured Streaming 处理一些事件。
传入事件如下所示:
事件 1:
url
http://first/path/to/read/from...
事件 2:
url
http://second/path/to/read/from...
以此类推
我的目标是读取每个 url 并生成一个新的 DF。到目前为止,我已经用这样的代码完成了 collect()
.
def createDF(url):
file_url = "abfss://" + container + "@" + az_storage_account + ".dfs.core.windows.net/" + az_storage_folder + "/" + url
""" Read data """
binary = spark.read.format("binaryFile").load(file_url)
""" Do other operations """
...
""" save the data """
# write it into blob again
return something
def loadData(batchDf, batchId):
"""
batchDf:
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
| body|partition| offset|sequenceNumber| enqueuedTime|publisher|partitionKey| properties|systemProperties| url|
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
|[{"topic":"/subsc...| 0|30084343744| 55489|2021-03-03 14:21:...| null| null|[aeg-event-type -...| []|http://path...|
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
"""
""" Before ....
df = batchDf.select("url")
url = df.collect()
[createDF(item) for item in url]
"""
# Now without collect()
# Select the url field of the df
url_select_df = batchDf.select("url")
# Read url value
result = url_select_df.rdd.map(lambda x: createDF(x.url))
query = df \
.writeStream \
.foreachBatch(loadData) \
.outputMode("update") \
.queryName("test") \
.start() \
.awaitTermination()
但是,当我想在不收集的情况下提取 URL 时,我收到以下错误消息:
您似乎正试图从广播中引用 SparkContext。
可能发生了什么?
非常感谢您的帮助
如果不调用 collect
,Dataframe url_select_df
将分布在执行程序中。当您随后调用 map
时,lambda 表达式将在执行程序上执行。因为 lambda 表达式正在调用 createDF
,它正在使用 SparkContext 你会得到异常,因为不可能在执行程序上使用 SparkContext。
看起来您已经想出了解决方案,即将 collect
数据帧发送到驱动程序并在那里应用 lambda 表达式。
只需确保您没有使驱动程序过载(基于内存)。
我正在尝试使用 Spark Structured Streaming 处理一些事件。
传入事件如下所示:
事件 1:
url |
---|
http://first/path/to/read/from... |
事件 2:
url |
---|
http://second/path/to/read/from... |
以此类推
我的目标是读取每个 url 并生成一个新的 DF。到目前为止,我已经用这样的代码完成了 collect()
.
def createDF(url):
file_url = "abfss://" + container + "@" + az_storage_account + ".dfs.core.windows.net/" + az_storage_folder + "/" + url
""" Read data """
binary = spark.read.format("binaryFile").load(file_url)
""" Do other operations """
...
""" save the data """
# write it into blob again
return something
def loadData(batchDf, batchId):
"""
batchDf:
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
| body|partition| offset|sequenceNumber| enqueuedTime|publisher|partitionKey| properties|systemProperties| url|
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
|[{"topic":"/subsc...| 0|30084343744| 55489|2021-03-03 14:21:...| null| null|[aeg-event-type -...| []|http://path...|
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
"""
""" Before ....
df = batchDf.select("url")
url = df.collect()
[createDF(item) for item in url]
"""
# Now without collect()
# Select the url field of the df
url_select_df = batchDf.select("url")
# Read url value
result = url_select_df.rdd.map(lambda x: createDF(x.url))
query = df \
.writeStream \
.foreachBatch(loadData) \
.outputMode("update") \
.queryName("test") \
.start() \
.awaitTermination()
但是,当我想在不收集的情况下提取 URL 时,我收到以下错误消息:
您似乎正试图从广播中引用 SparkContext。
可能发生了什么?
非常感谢您的帮助
如果不调用 collect
,Dataframe url_select_df
将分布在执行程序中。当您随后调用 map
时,lambda 表达式将在执行程序上执行。因为 lambda 表达式正在调用 createDF
,它正在使用 SparkContext 你会得到异常,因为不可能在执行程序上使用 SparkContext。
看起来您已经想出了解决方案,即将 collect
数据帧发送到驱动程序并在那里应用 lambda 表达式。
只需确保您没有使驱动程序过载(基于内存)。