使用 pyspark 缓存简单 RDD 而没有缓存代码时出错(How to make a class picklable in a notebook)

Error in caching a simple RDD with pyspark while without caching the code works fine (How to make a class picklable in a notebook)

我有以下导致有关缓存错误的简单代码:

trips_in = sc.textFile("trip_data.csv")
trips = trips_in.map(lambda l: l.split(",")).map(lambda x: parseTrip(x)).cache()

trips.count()

函数 parseTrip() 获取字符串列表并创建 returns 一个 class Trip:

class Trip:
  def __init__(self, id, duration):
    self.id = id
    self.duration = duration

我在操作后立即收到错误 count()。但是,如果我删除第二行末尾的 cache() 一切正常。 根据错误,问题是 class Trip cannot be pickle:

PicklingError: Can't pickle __main__.Trip: attribute lookup __main__.Trip failed

那么我怎样才能让它变得可腌制(如果它是一个实际的词)? 请注意,我使用的是 Databricks notebook,因此我无法为 class 定义创建单独的 .py 以使其可 picklable。

环境不影响答案 - 如果你想使用自定义 类 它必须在集群中的每个节点上都是可导入的。

  • 对于单个模块,您可以轻松地将 SparkContext.addPyFile 与 URL 一起使用到 GitHub Gist(或其他支持的格式:“HDFS(或其他 Hadoop 支持的文件系统)中的文件,或一个 HTTP、HTTPS 或 FTP URI")

    • 创建要点。
    • 单击原始 link 并复制 URL。
    • 在你的笔记本中调用:

      sc.addPyFile(raw_gist_url)
      
  • 对于复杂的依赖项,您分发 egg 文件。

    • 创建Python package using setuptools

      目录结构:

      .
      ├── setup.py
      └── trip
          └── __init__.py
      

      示例安装文件:

      #!/usr/bin/env python
      
      from setuptools import setup
      
      setup(name='trip',
            version='0.0.1',
            description='Trip',
            author='Jane Doe',
            author_email='jane@example.com',
            url='https://example.com',
            packages=['trip'],)
      
    • 创建 egg 文件:

      python setup.py bdist_egg
      

      这将使用 trip-0.0.1-pyX.Y.egg 文件创建 dist 目录

    • 转到 Databricks 仪表板 -> 新建 -> 库并从 dist 目录上传 egg 文件:

    • 将库附加到您要使用的集群。

  • 最后,如果您只需要一个记录类型,您可以使用 namedtuple 而无需任何其他步骤:

    from collections import namedtuple
    
    Trip = namedtuple('Trip', ['id', 'duration'])