设置导入到 pyspark 的 s3 文件的条件

Set condition for s3 file imported to pyspark

我是新的 PySpark 和 AWS EMR。

对于Pyspark.py脚本,其简单如下:

我想检查加载的 s3 文件内容以 123xxxx 开始。

from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: wordcount  ", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="WordCount")
    text_file = sc.textFile(sys.argv[1])
    if text_file.startswith('123'):
        counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
        counts.saveAsTextFile(sys.argv[2])
        sc.stop()
    else:
        exit(-1)

当我 运行 进入 AWS emr 时:

s3a://sparkpy/output/a/a.txt s3a://sparkpy/output/a

但是出现错误。

基本上,我计算字符串并比较:

rdd = text_file.filter(lambda x: "gfg" in x)
    if rdd.count() > 0: