在 PySpark 环境中创建缓存的最佳方式

Optimal way of creating a cache in the PySpark environment

我正在使用 Spark Streaming 创建一个系统来丰富来自 cloudant 数据库的传入数据。示例 -

Incoming Message: {"id" : 123}
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"}

我的驱动class代码如下:

from Sample.Job import EnrichmentJob
from Sample.Job import FunctionJob
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession

from kafka import KafkaConsumer, KafkaProducer
import json

class SampleFramework():

    def __init__(self):
        pass

    @staticmethod
    def messageHandler(m):
        return json.loads(m.message)

    @staticmethod
    def processData(rdd):

        if (rdd.isEmpty()):
            print("RDD is Empty")
            return

        # Expand
        expanded_rdd = rdd.mapPartitions(EnrichmentJob.enrich)

        # Score
        scored_rdd = expanded_rdd.map(FunctionJob.function)

        # Publish RDD


    def run(self, ssc):

        self.ssc = ssc

        directKafkaStream = KafkaUtils.createDirectStream(self.ssc, QUEUENAME, \
                                                          {"metadata.broker.list": META, 
                                                          "bootstrap.servers": SERVER}, \
                                                          messageHandler= SampleFramework.messageHandler)

        directKafkaStream.foreachRDD(SampleFramework.processData)

        ssc.start()
        ssc.awaitTermination()

浓缩作业的代码如下: class 丰富工作:

cache = {}

@staticmethod
def enrich(data):

    # Assume that Cloudant Connector using the available config
    cloudantConnector = CloudantConnector(config, config["cloudant"]["host"]["req_db_name"])
    final_data = []
    for row in data:
        id = row["id"]
        if(id not in EnrichmentJob.cache.keys()):
            data = cloudantConnector.getOne({"id": id})
            row["data"] = data
            EnrichmentJob.cache[id]=data
        else:
            data = EnrichmentJob.cache[id]
            row["data"] = data
        final_data.append(row)

    cloudantConnector.close()

    return final_data

我的问题是 - 是否有办法维护 [1]"a global cache on the main memory that is accessible to all workers" 或 [2]"local caches on each of the workers such that they remain persisted in the foreachRDD setting"?

我已经探索了以下内容 -

  1. 广播变量 - 我们采用 [1] 方式。据我了解,它们应该是只读的和不可变的。我已经检查了这个 但它引用了一个 unpersisting/persisting 广播变量的例子。这是一个好的做法吗?

  2. 静态变量 - 这里我们采用 [2] 方式。被引用的 class(在本例中为 "Enricher")以静态变量字典的形式维护缓存。但事实证明,ForEachRDD 函数会为每个传入的 RDD 生成一个全新的进程,这会删除之前启动的静态变量。这是上面的编码。

我现在有两个可能的解决方案 -

  1. 在文件系统上维护一个离线缓存。
  2. 在我的驱动节点上完成这个丰富任务的全部计算。这将导致整个数据最终都在驱动程序上并在那里维护。缓存对象将作为映射函数的参数发送到扩充作业。

这里显然第一个看起来比第二个好,但我想得出结论,这两个是唯一的解决方法,然后再提交给他们。任何指针将不胜感激!

Is there someway to maintain [1]"a global cache on the main memory that is accessible to all workers"

没有。没有 "main memory" 可以被所有工人访问。每个工作人员 运行 在一个单独的进程中,并通过套接字与外部世界通信。更不用说非本地模式下不同物理节点之间的分离了。

有一些技术可用于实现具有内存映射数据的 worker 作用域缓存(使用 SQLite 是最简单的一种),但需要一些额外的努力才能实现正确的方法(避免冲突等)。

or [2]"local caches on each of the workers such that they remain persisted in the foreachRDD setting"?

您可以使用范围限于单个工作进程的标准缓存技术。根据配置(静态与 dynamic resource allocationspark.python.worker.reuse),它可能会或可能不会在多个任务和批次之间保留。

考虑以下简化示例:

  • map_param.py:

    from pyspark import AccumulatorParam
    from collections import Counter
    
    class CounterParam(AccumulatorParam):
        def zero(self, v: Counter) -> Counter:
            return Counter()
    
        def addInPlace(self, acc1: Counter, acc2: Counter) -> Counter:
            acc1.update(acc2)
            return acc1
    
  • my_utils.py:

    from pyspark import Accumulator
    from typing import Hashable
    from collections import Counter
    
    # Dummy cache. In production I would use functools.lru_cache 
    # but it is a bit more painful to show with accumulator
    cached = {} 
    
    def f_cached(x: Hashable, counter: Accumulator) -> Hashable:
        if cached.get(x) is None:
            cached[x] = True
            counter.add(Counter([x]))
        return x
    
    
    def f_uncached(x: Hashable, counter: Accumulator) -> Hashable:
        counter.add(Counter([x]))
        return x
    
  • main.py:

    from pyspark.streaming import StreamingContext
    from pyspark import SparkContext
    
    from counter_param import CounterParam
    import my_utils
    
    from collections import Counter
    
    def main():
        sc = SparkContext("local[1]")
        ssc = StreamingContext(sc, 5)
    
        cnt_cached = sc.accumulator(Counter(), CounterParam())
        cnt_uncached = sc.accumulator(Counter(), CounterParam())
    
        stream = ssc.queueStream([
            # Use single partition to show cache in work
            sc.parallelize(data, 1) for data in
            [[1, 2, 3], [1, 2, 5], [1, 3, 5]]
        ])
    
        stream.foreachRDD(lambda rdd: rdd.foreach(
            lambda x: my_utils.f_cached(x, cnt_cached)))
        stream.foreachRDD(lambda rdd: rdd.foreach(
            lambda x: my_utils.f_uncached(x, cnt_uncached)))
    
        ssc.start()
        ssc.awaitTerminationOrTimeout(15)
        ssc.stop(stopGraceFully=True)
    
        print("Counter cached {0}".format(cnt_cached.value))
        print("Counter uncached {0}".format(cnt_uncached.value))
    
    if __name__ == "__main__":
        main()
    

示例运行:

bin/spark-submit main.py
Counter cached Counter({1: 1, 2: 1, 3: 1, 5: 1})
Counter uncached Counter({1: 3, 2: 2, 3: 2, 5: 2})

如您所见,我们得到了预期的结果:

  • 对于 "cached" 个对象,每个工作进程(分区)的每个唯一键仅更新一次累加器。
  • 对于未缓存的对象,每次出现键时都会更新累加器。