Pyspark sc.textFile() 没有完全加载文件

Pyspark sc.textFile() doesn't load file completely

我在 Cloudera 快速启动 docker 容器上使用 Python Spark (v 1.6.0)。 我把一个 static .txt 文件(500 mb)放在 /user/root/access_log.txt 下的 hdfs 中,成功了。

在 pyspark 中,我尝试使用以下 python 代码行加载文件:

lines = sc.textFile("hdfs://quickstart.cloudera/user/root/access_log.txt")

这没有给我任何错误。但是我发现文件 没有完全加载 。 还有..

lines.max()

给出文件的最后一个元素不正确,而 hdfs 实际上具有正确的文件大小。

这是内存问题吗?我的 docker 设置为 3840 MB。 我不知道如何解决这个问题。我期待着您的回答。

编辑:

我计算了数据集中的元素:

lines.count()

令我惊讶的是它是正确的!这应该意味着我的文件已正确加载。但问题仍然存在,为什么 .max() 语句没有 return 正确的元素。

这与不同的任务有关吗?

编辑 2: .txt 文件中的一些示例行

10.190.174.142 - - [03/Dec/2011:13:28:10 -0800] "GET /images/filmpics/0000/2229/GOEMON-NUKI-000163.jpg HTTP/1.1" 200 184976
10.190.174.142 - - [03/Dec/2011:13:28:11 -0800] "GET /images/filmmediablock/360/GOEMON-NUKI-000163.jpg HTTP/1.1" 200 60117
10.190.174.142 - - [03/Dec/2011:13:28:10 -0800] "GET /images/filmmediablock/360/Chacha.jpg HTTP/1.1" 200 109379
10.190.174.142 - - [03/Dec/2011:13:28:11 -0800] "GET /images/filmmediablock/360/GOEMON-NUKI-000159.jpg HTTP/1.1" 200 161657

一般来说 max 不应该 return (...) 最后一个元素 。在某些情况下,如果日志文件使用的格式强制执行字典顺序,并且您对内容很幸运,否则它就不会发生。由于您的数据以 IP 地址为前缀并使用不友好(例如 ISO 8601)时间戳格式,因此获取最后一个元素不是您可以期望的。

查找最后一个元素的一种方法是包含索引:

from operator import itemgetter

(rdd
    .zipWithIndex()                # Add line number to get (line, no)
    .max(key=itemgetter(1))[0])    # Compare elements using index

有点不同的方法是为每个分区找到最后一个元素,然后从这些元素中找到最后一个。

from functools import reduce

rdd.mapPartitions(lambda part: reduce(lambda _, x: [x], part, [])).collect()[-1]

或者如果分区数很大:

(rdd
    .mapPartitionsWithIndex(
        lambda i, part: reduce(lambda _, x: [(i, x)], part, []))
    .max()[1])  # Take max using tuple ordering