在本地环境下,结果值和数据流结果值不同

In the local environment, the result value and the dataflow result values are different

这是我的输入数据。

ㅡ.Input(本地)

'Iot,c c++ python,2015',
'Web,java spring,2016',
'Iot,c c++ spring,2017',
'Iot,c c++ spring,2017',

这是在本地环境中 运行ning apache-beam 的结果。

ㅡ.Outout(本地)

Iot,2015,c,1
Iot,2015,c++,1
Iot,2015,python,1
Iot,2017,c,2
Iot,2017,c++,2
Iot,2017,spring,2
Web,2016,java,1
Web,2016,spring,1

但是,当我运行 google-cloud-platform 数据流并将其放入桶中时,结果不同。

ㅡ.存储(桶)

Web,2016,java,1
Web,2016,spring,1
Iot,2015,c,1
Iot,2015,c++,1
Iot,2015,python,1
Iot,2017,c,1
Iot,2017,c++,1
Iot,2017,spring,1
Iot,2017,c,1
Iot,2017,c++,1
Iot,2017,spring,1

这是我的代码。

ㅡ.代码

#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam

pipeline_options = PipelineOptions(
    project='project-id',
    runner='dataflow',
    temp_location='bucket-location'
)
def pardo_dofn_methods(test=None):
    import apache_beam as beam


    class split_category_advanced(beam.DoFn):
      def __init__(self, delimiter=','):
        self.delimiter = delimiter
        self.k = 1
        self.pre_processing = []
        self.window = beam.window.GlobalWindow()
        self.year_dict = {}
        self.category_index = 0
        self.language_index = 1
        self.year_index = 2;
        self.result = []

      def setup(self):
          print('setup')

      def start_bundle(self):
          print('start_bundle')

      def finish_bundle(self):
          
          print('finish_bundle')
          for ppc_index in range(len(self.pre_processing)) :
              if self.category_index == 0 or self.category_index%3 == 0 :
                  if self.pre_processing[self.category_index] not in self.year_dict :
                          self.year_dict[self.pre_processing[self.category_index]] = {}

          
          if ppc_index + 2 == 2 or ppc_index + 2 == self.year_index :
              # { category : { year : {} } }
              if self.pre_processing[self.year_index] not in self.year_dict[self.pre_processing[self.category_index]] :
                     self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] = {}
              # { category : { year : c : { }, c++ : { }, java : { }}}
              language = self.pre_processing[self.year_index-1].split(' ')

              for lang_index in range(len(language)) :
                    if language[lang_index] not in self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] :
                      self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][language[lang_index]] = 1
                    else :
                        self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][
                            language[lang_index]] += 1
              self.year_index = self.year_index + 3
          self.category_index = self.category_index + 1

      csvFormat = ''
      for category, nested in self.year_dict.items() :
          for year in nested :
              for language in nested[year] :
                csvFormat+= (category+","+str(year)+","+language+","+str(nested[year][language]))+"\n"
                print(csvFormat)

      yield beam.utils.windowed_value.WindowedValue(
          value=csvFormat,
          #value = self.pre_processing,
          timestamp=0,
          windows=[self.window],
      )

  def process(self, text):
    for word in text.split(self.delimiter):
        self.pre_processing.append(word)
    print(self.pre_processing)

#with beam.Pipeline(options=pipeline_options) as pipeline:
with beam.Pipeline() as pipeline:
  results = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          'Iot,c c++ python,2015',
          'Web,java spring,2016',
          'Iot,c c++ spring,2017',
          'Iot,c c++ spring,2017',
  ])
      | 'Split category advanced' >> beam.ParDo(split_category_advanced(','))
      | 'Save' >> beam.io.textio.WriteToText("bucket-location")
      | beam.Map(print) \
  )
   if test:
       return test(results)

if __name__ == '__main__':
  pardo_dofn_methods_basic()

执行简单字数统计的代码。 CSV 列有一个 [ category, year, language, count ] 例如) IoT, 2015, c, 1

感谢您阅读。

您得到不同输出的最可能原因是并行性。使用 DataflowRunner 时,操作 运行 尽可能并行。由于您正在使用 ParDo 进行计数,因此当元素 Iot,c c++ spring,2017 转到两个不同的 worker 时,计数不会按您希望的那样发生(您正在 ParDo 中计数)。

您需要使用组合器 (4.2.4)

这里有一个简单的例子说明你想做什么:

    def generate_kvs(element, csv_delimiter=',', field_delimiter=' '):
        splitted = element.split(csv_delimiter)
        fields = splitted[1].split(field_delimiter)

        # final key to count is (Source, year, language)
        return [(f"{splitted[0]}, {splitted[2]}, {x}", 1) for x in fields]


    p = beam.Pipeline()

    elements = ['Iot,c c++ python,2015',
              'Web,java spring,2016',
              'Iot,c c++ spring,2017',
              'Iot,c c++ spring,2017']

    (p | Create(elements)
       | beam.ParDo(generate_kvs)
       | beam.combiners.Count.PerKey()
       | "Format" >> Map(lambda x: f"{x[0]}, {x[1]}")
       | Map(print))

    p.run()

无论元素在工人之间的分布如何,这都会输出您想要的结果。

注意 Apache Beam 的想法是尽可能并行化,为了聚合,您需要组合器

我建议您检查一些 wordcounts examples 以便您掌握组合器的窍门

编辑

关于组合器的说明:

ParDo 是一个在元素到元素基础上发生的操作。它获取一个元素,进行一些操作并将输出发送到下一个 PTransform。当您需要聚合数据(计数元素、求和值、连接句子...)时,元素明智的操作不起作用,您需要采用 PCollection 的东西(即,许多具有逻辑的元素)并输出一些东西。这就是组合器发挥作用的地方,它们以 PCollection 为基础执行操作,这可以跨工作人员进行(Map-Reduce 操作的一部分)

在您的示例中,您使用 Class 参数将计数存储在 ParDo 中,因此当元素经过它时,它会更改 class 中的参数。当所有元素都经过同一个 worker 时,这会起作用,因为 Class 是在 worker 基础上“创建”的(即,它们不共享状态),但是当有更多 worker 时,计数(与ParDo) 将分别发生在每个工人身上