您似乎正试图从广播中引用 SparkContext

It appears that you are attempting to reference SparkContext from a broadcast

我正在尝试使用 Spark Structured Streaming 处理一些事件。

传入事件如下所示:

事件 1:

url
http://first/path/to/read/from...

事件 2:

url
http://second/path/to/read/from...

以此类推

我的目标是读取每个 url 并生成一个新的 DF。到目前为止,我已经用这样的代码完成了 collect().


def createDF(url):

    file_url = "abfss://" + container + "@" + az_storage_account + ".dfs.core.windows.net/" + az_storage_folder + "/" + url

    """ Read data """
    binary = spark.read.format("binaryFile").load(file_url)
    """ Do other operations """
    ...

    """ save the data """
    # write it into blob again

    return something

def loadData(batchDf, batchId):

    
    """
    batchDf:
        +--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
        |                body|partition|     offset|sequenceNumber|        enqueuedTime|publisher|partitionKey|          properties|systemProperties|                 url|
        +--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
        |[{"topic":"/subsc...|        0|30084343744|         55489|2021-03-03 14:21:...|     null|        null|[aeg-event-type -...|              []|http://path...|
        +--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
    """

    """ Before .... 

    df = batchDf.select("url")
    url = df.collect()

    [createDF(item) for item in url]
    """
    # Now without collect()
    # Select the url field of the df
    url_select_df = batchDf.select("url")

    # Read url value
    result = url_select_df.rdd.map(lambda x: createDF(x.url))
  
query  = df \
    .writeStream \
    .foreachBatch(loadData) \
    .outputMode("update") \
    .queryName("test") \
    .start() \
    .awaitTermination()

但是,当我想在不收集的情况下提取 URL 时,我收到以下错误消息:

您似乎正试图从广播中引用 SparkContext

可能发生了什么?

非常感谢您的帮助

如果不调用 collect,Dataframe url_select_df 将分布在执行程序中。当您随后调用 map 时,lambda 表达式将在执行程序上执行。因为 lambda 表达式正在调用 createDF,它正在使用 SparkContext 你会得到异常,因为不可能在执行程序上使用 SparkContext。

看起来您已经想出了解决方案,即将 collect 数据帧发送到驱动程序并在那里应用 lambda 表达式。

只需确保您没有使驱动程序过载(基于内存)。