Google Dataflow 似乎删除了第 1000 条记录

Google Dataflow seems to drop 1000th record

我已经使用 Google Dataflow (apache-beam) 设置了一个小测试。实验的用例是获取 (csv) 文件并将选定的列写入 (txt) 文件。

实验代码如下:

from __future__ import absolute_import

import argparse
import logging
import re

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class EmitColDoFn(beam.DoFn):
    first = True
    header = ""
    def __init__(self, i):
        super(EmitColDoFn, self).__init__()
        self.line_count =  Metrics.counter(self.__class__, 'lines')
        self.i = i

    def process(self, element):
        if self.first:
            self.header = element
            self.first = False
        else:
            self.line_count.inc()
            cols = re.split(',', element)
            return (cols[self.i],)

def run(argv=None):
    """Main entry point; defines and runs the wordcount pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='/users/sms/python_beam/data/MOCK_DATA (4).csv',
#                        default='gs://dataflow-samples/shakespeare/kinglear.txt',
                        help='Input file to process.')
    parser.add_argument('--output',
                        dest='output',
                        default="/users/sms/python_beam/data/",
#                        required=True,
                        help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)

    # Read the text file[pattern] into a PCollection.
    lines = p | 'read' >> ReadFromText(known_args.input)

    column = (lines
            | 'email col' >> (beam.ParDo(EmitColDoFn(3)))
            | "col file" >> WriteToText(known_args.output, ".txt", shard_name_template="SS_Col"))

    result = p.run()
    result.wait_until_finish()

    if (not hasattr(result, 'has_job')  # direct runner
        or result.has_job):  # not just a template creation
        lines_filter = MetricsFilter().with_name('lines')
        query_result = result.metrics().query(lines_filter)
        if query_result['counters']:
            lines_counter = query_result['counters'][0]

        print "Lines committed", lines_counter.committed
run()

下面示例 1 的最后几行:

990,Corabel,Feldbau,cfeldbaurh@deliciousdays.com,Female,84.102.162.190,DJ
991,Kiley,Rottcher,krottcherri@stanford.edu,Male,91.97.155.28,CA
992,Glenda,Clist,gclistrj@state.gov,Female,24.98.253.127,UA
993,Ingunna,Maher,imaherrk@army.mil,Female,159.31.127.19,PL
994,Megan,Giacopetti,mgiacopettirl@instagram.com,Female,115.6.63.52,RU
995,Briny,Dutnall,bdutnallrm@xrea.com,Female,102.81.33.24,SE
996,Jan,Caddan,jcaddanrn@jalbum.net,Female,115.142.222.106,PL

运行 这会产生预期的输出:

/usr/local/bin/python2.7
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
Lines committed 996

Process finished with exit code 0

现在是奇怪的结果。接下来运行,行数增加到1000

994,Megan,Giacopetti,mgiacopettirl@instagram.com,Female,115.6.63.52,RU
995,Briny,Dutnall,bdutnallrm@xrea.com,Female,102.81.33.24,SE
996,Jan,Caddan,jcaddanrn@jalbum.net,Female,115.142.222.106,PL
997,Shannen,Gaisford,sgaisfordr7@rediff.com,Female,167.255.222.92,RU
998,Lorianna,Slyne,lslyner8@cbc.ca,Female,54.169.60.13,CN
999,Franklin,Yaakov,fyaakovr9@latimes.com,Male,122.1.92.236,CN
1000,Wilhelmine,Cariss,wcarissra@creativecommons.org,Female,237.48.113.255,PL

但是这次输出是

/usr/local/bin/python2.7
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
Lines committed 999

Process finished with exit code 0

检查输出文件表明最后一行没有被处理。

bdutnallrm@xrea.com
jcaddanrn@jalbum.net
sgaisfordr7@rediff.com
lslyner8@cbc.ca
fyaakovr9@latimes.com

知道这里发生了什么吗?

'EditColDoFn' 跳过第一行,假设每个文件都有它的一个实例。当您有更多 1000 行时,DirectRunner 会创建两个包:第一个 1000 行,第二个 1 行。在 Beam 应用程序中,输入可能会分成多个包以进行并行处理。文件数量和捆绑包数量没有关联。同一个应用程序可以处理分布在多个文件中的 terra 字节数据。

ReadFromText 有一个选项 'skip_header_lines',您可以将其设置为 1 以跳过每个输入文件中的 header 行。