如何通过加入 spark 数据框实现服务器端过滤 api

How can I achieve server side filtering with the join in spark dataframe api

这是我的 spark 应用程序的一部分。第一部分是我在过去 1 小时内获取所有文章的部分,代码的第二部分是获取所有这些文章评论的部分。第三部分对文章进行评论。 问题是 articles.map(lambda x:(x.id,x.id)).join(axes) 部分太慢了,大约需要 1 分钟。我想将此时间缩短到 10 秒甚至更短,但不知道该怎么做?感谢您的回复。

articles = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="articles", keyspace=source).load() \
                        .map(lambda x:x).filter(lambda x:x.created_at!=None).filter(lambda x:x.created_at>=datetime.now()-timedelta(hours=1) and x.created_at<=datetime.now()-timedelta(hours=0)).cache()

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().map(lambda x:(x.article,x))

speed_rdd = articles.map(lambda x:(x.id,x.id)).join(axes)

编辑

这是我的新代码,我根据您的建议进行了更改。现在速度已经是以前的 2 倍,非常感谢 ;)。我想对轴部分代码的最后一部分进行另一项改进,它仍然太慢并且需要 38 秒才能处理 3000 万个数据:

range_expr = col("created_at").between(
                            datetime.now()-timedelta(hours=timespan),
                            datetime.now()-timedelta(hours=time_delta(timespan))
                        )
        article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').persist()


        axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()

我在这里试过这个(它应该替换我代码的最后一个轴部分)这也是我想要的解决方案,但它似乎不能正常工作:

in_expr = col("article").isin(article_ids.collect())
        axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().where(in_expr)

我总是收到此错误消息:

in_expr = col("article").isin(article_ids.collect())
Traceback (most recent call last):                                              
  File "<stdin>", line 1, in <module>
TypeError: 'Column' object is not callable

感谢您的帮助。

As 如果您想获得合理的性能,请不要将您的数据转换为 RDD。它不仅使像谓词下推这样的优化变得不可能,而且还引入了将数据从 JVM 移出到 Python 的巨大开销。

相反,您应该使用 SQL 表达式 / DataFrame API 的方式类似于:

from pyspark.sql.functions import col, expr, current_timestamp

range_expr = col("created_at").between(
    current_timestamp() - expr("INTERVAL 1 HOUR"),
    current_timestamp())

articles = (sqlContext.read.format("org.apache.spark.sql.cassandra")
    .options(...).load()
    .where(col("created_at").isNotNull())  # This is not really required
    .where(range_expr))

也应该可以使用标准 Python 实用程序来制定谓词表达式,就像您之前所做的那样:

import datetime

range_expr = col("created_at").between(
    datetime.datetime.now() - datetime.timedelta(hours=1),
    datetime.datetime.now()
)

后续 join 也应在不将数据移出数据帧的情况下执行:

axes = (sqlContext.read.format("org.apache.spark.sql.cassandra")
    .options(...)
    .load())

articles.join(axes, ["id"])

1) Spark-Cassandra 连接器会自动检测谓词下推,只要在 Cassandra 中可以进行过滤(使用主键进行过滤或二级索引):https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#pushing-down-clauses-to-cassandra

2) 为了更高效的联接,您可以调用方法repartitionByCassandraReplica。不幸的是,此方法可能不适用于 PySpark,仅适用于 Scala/Java API。在此处阅读文档:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12

3) 另一个提示是尝试调试并了解连接器如何创建 Spark 分区。文档中提到了一些示例和注意事项:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md