仅映射 spark 中的作业(与 hadoop 流相比)
Map only jobs in spark (vs hadoop streaming)
我有一个函数 process_line
从输入格式映射到输出格式
有些行已损坏,需要忽略。
我已成功 运行将此代码作为 python 流作业:
for input_line in sys.stdin:
try:
output_line=process_line(input_line.strip())
print (output_line)
except:
sys.stderr.write('Error with line: {l}\n'.format(l=input_line))
continue
如何 运行 pyspark 中的等效代码?
这是我试过的:
input = sc.textFile(input_dir, 1)
output=lines.map(process_line)
output.saveAsTextFile(output_dir)
如何跟踪损坏的线路并统计它们的数量?
您正在尝试将文本文件读取到一个分区,这可能会导致您的作业 运行 缓慢,因为您基本上放弃了并行性。
尝试这样做:
input = sc.textFile(input_dir)
output = lines.map(process_line)
output.saveAsTextFile(output_dir)
至于损坏的行,您可以在 process_line
函数中使用 try-except 机制,并可能将有问题的行写入某些日志文件,或者尝试执行其他逻辑。
我有一个函数 process_line
从输入格式映射到输出格式
有些行已损坏,需要忽略。
我已成功 运行将此代码作为 python 流作业:
for input_line in sys.stdin:
try:
output_line=process_line(input_line.strip())
print (output_line)
except:
sys.stderr.write('Error with line: {l}\n'.format(l=input_line))
continue
如何 运行 pyspark 中的等效代码? 这是我试过的:
input = sc.textFile(input_dir, 1)
output=lines.map(process_line)
output.saveAsTextFile(output_dir)
如何跟踪损坏的线路并统计它们的数量?
您正在尝试将文本文件读取到一个分区,这可能会导致您的作业 运行 缓慢,因为您基本上放弃了并行性。
尝试这样做:
input = sc.textFile(input_dir)
output = lines.map(process_line)
output.saveAsTextFile(output_dir)
至于损坏的行,您可以在 process_line
函数中使用 try-except 机制,并可能将有问题的行写入某些日志文件,或者尝试执行其他逻辑。