我可以在常规 Spark 地图操作中使用 Spark DataFrame 吗?

Can I use Spark DataFrame inside regular Spark map operation?

我尝试使用来自常规 Spark 映射操作的 Spark DataFrame 之前定义,如下所示:

businessJSON = os.path.join(targetDir, 'business.json')
businessDF = sqlContext.read.json(businessJSON)

reviewsJSON = os.path.join(targetDir, 'review.json')
reviewsDF = sqlContext.read.json(reviewsJSON)

contains = udf(lambda xs, val: val in xs, BooleanType())

def selectReviews(category):
    businessesByCategory = businessDF[contains(businessDF.categories, lit(category))]
    selectedReviewsDF = reviewsDF.join(businessesByCategory,\
                                   businessesByCategory.business_id == reviewsDF.business_id)      
    return selectedReviewsDF.select("text").map(lambda x: x.text)

categories = ['category1', 'category2'] 
rdd = (sc.parallelize(cuisines)
       .map(lambda c: (c, selectReviews(c)))
       )

rdd.take(1)

我收到一条巨大的错误消息:

Py4JError                                 Traceback (most recent call last)
<ipython-input-346-051af5183a76> in <module>()
     12        )
     13 
---> 14 rdd.take(1)

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in take(self, num)
   1275 
   1276             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1277             res = self.context.runJob(self, takeUpToNumLeft, p, True)
   1278 
   1279             items += res

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    894         # SparkContext#runJob.
    895         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 896         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions,
    897                                           allowLocal)
    898         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in _jrdd(self)
   2361         command = (self.func, profiler, self._prev_jrdd_deserializer,
   2362                    self._jrdd_deserializer)
-> 2363         pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
   2364         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
   2365                                              bytearray(pickled_cmd),

 /usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
   2281     # the serialized command will be compressed by broadcast
   2282     ser = CloudPickleSerializer()
-> 2283     pickled_command = ser.dumps(command)
   2284     if len(pickled_command) > (1 << 20):  # 1M
   2285         # The broadcast will have same life cycle as created PythonRDD

 ...

/Users/igorsokolov/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    304             reduce = getattr(obj, "__reduce_ex__", None)
    305             if reduce:
--> 306                 rv = reduce(self.proto)
    307             else:
    308                 reduce = getattr(obj, "__reduce__", None)

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    302                 raise Py4JError(
    303                     'An error occurred while calling {0}{1}{2}. Trace:\n{3}\n'.
--> 304                     format(target_id, '.', name, value))
    305         else:
    306             raise Py4JError(

Py4JError: An error occurred while calling o96495.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

我做了一些调查以了解到底是哪一行导致了这个错误,我发现得到这个错误的最少代码是:

def selectReviews(category):
    return reviewsDF.select("text")

rdd = (sc.parallelize(categories)
       .map(lambda c: (c, selectReviews(c)))
       )

rdd.take(1)

因此我得出结论,我使用了某种错误的 DataFrame,但 Spark 文档中并不清楚具体是什么。我怀疑 reviewsDF 应该分布在集群中的所有机器上,但我想因为我是使用 SqlContext 创建的,所以它应该已经在 Spark 上下文中。

提前致谢。

Spark 不可重入。具体来说,工作人员不能在另一个操作或转换的步骤中执行新的 RDD 操作或转换。

当在工作节点上发生的映射的 lambda 函数中调用 selectReviews 时会出现此问题,因为 selectReviews 需要在 RDD 支持 [=13] 上执行 .select() =].

解决方法是将 sc.parallelize 替换为一个简单的 for 循环或类似的循环,覆盖 categories,在本地执行。来自 spark 的加速仍将参与每次调用 selectReviews.

时发生的数据帧过滤