在本地环境下,结果值和数据流结果值不同
In the local environment, the result value and the dataflow result values are different
这是我的输入数据。
ㅡ.Input(本地)
'Iot,c c++ python,2015',
'Web,java spring,2016',
'Iot,c c++ spring,2017',
'Iot,c c++ spring,2017',
这是在本地环境中 运行ning apache-beam 的结果。
ㅡ.Outout(本地)
Iot,2015,c,1
Iot,2015,c++,1
Iot,2015,python,1
Iot,2017,c,2
Iot,2017,c++,2
Iot,2017,spring,2
Web,2016,java,1
Web,2016,spring,1
但是,当我运行 google-cloud-platform 数据流并将其放入桶中时,结果不同。
ㅡ.存储(桶)
Web,2016,java,1
Web,2016,spring,1
Iot,2015,c,1
Iot,2015,c++,1
Iot,2015,python,1
Iot,2017,c,1
Iot,2017,c++,1
Iot,2017,spring,1
Iot,2017,c,1
Iot,2017,c++,1
Iot,2017,spring,1
这是我的代码。
ㅡ.代码
#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
pipeline_options = PipelineOptions(
project='project-id',
runner='dataflow',
temp_location='bucket-location'
)
def pardo_dofn_methods(test=None):
import apache_beam as beam
class split_category_advanced(beam.DoFn):
def __init__(self, delimiter=','):
self.delimiter = delimiter
self.k = 1
self.pre_processing = []
self.window = beam.window.GlobalWindow()
self.year_dict = {}
self.category_index = 0
self.language_index = 1
self.year_index = 2;
self.result = []
def setup(self):
print('setup')
def start_bundle(self):
print('start_bundle')
def finish_bundle(self):
print('finish_bundle')
for ppc_index in range(len(self.pre_processing)) :
if self.category_index == 0 or self.category_index%3 == 0 :
if self.pre_processing[self.category_index] not in self.year_dict :
self.year_dict[self.pre_processing[self.category_index]] = {}
if ppc_index + 2 == 2 or ppc_index + 2 == self.year_index :
# { category : { year : {} } }
if self.pre_processing[self.year_index] not in self.year_dict[self.pre_processing[self.category_index]] :
self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] = {}
# { category : { year : c : { }, c++ : { }, java : { }}}
language = self.pre_processing[self.year_index-1].split(' ')
for lang_index in range(len(language)) :
if language[lang_index] not in self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] :
self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][language[lang_index]] = 1
else :
self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][
language[lang_index]] += 1
self.year_index = self.year_index + 3
self.category_index = self.category_index + 1
csvFormat = ''
for category, nested in self.year_dict.items() :
for year in nested :
for language in nested[year] :
csvFormat+= (category+","+str(year)+","+language+","+str(nested[year][language]))+"\n"
print(csvFormat)
yield beam.utils.windowed_value.WindowedValue(
value=csvFormat,
#value = self.pre_processing,
timestamp=0,
windows=[self.window],
)
def process(self, text):
for word in text.split(self.delimiter):
self.pre_processing.append(word)
print(self.pre_processing)
#with beam.Pipeline(options=pipeline_options) as pipeline:
with beam.Pipeline() as pipeline:
results = (
pipeline
| 'Gardening plants' >> beam.Create([
'Iot,c c++ python,2015',
'Web,java spring,2016',
'Iot,c c++ spring,2017',
'Iot,c c++ spring,2017',
])
| 'Split category advanced' >> beam.ParDo(split_category_advanced(','))
| 'Save' >> beam.io.textio.WriteToText("bucket-location")
| beam.Map(print) \
)
if test:
return test(results)
if __name__ == '__main__':
pardo_dofn_methods_basic()
执行简单字数统计的代码。
CSV 列有一个 [ category, year, language, count ]
例如) IoT, 2015, c, 1
感谢您阅读。
您得到不同输出的最可能原因是并行性。使用 DataflowRunner
时,操作 运行 尽可能并行。由于您正在使用 ParDo 进行计数,因此当元素 Iot,c c++ spring,2017
转到两个不同的 worker 时,计数不会按您希望的那样发生(您正在 ParDo 中计数)。
您需要使用组合器 (4.2.4)
这里有一个简单的例子说明你想做什么:
def generate_kvs(element, csv_delimiter=',', field_delimiter=' '):
splitted = element.split(csv_delimiter)
fields = splitted[1].split(field_delimiter)
# final key to count is (Source, year, language)
return [(f"{splitted[0]}, {splitted[2]}, {x}", 1) for x in fields]
p = beam.Pipeline()
elements = ['Iot,c c++ python,2015',
'Web,java spring,2016',
'Iot,c c++ spring,2017',
'Iot,c c++ spring,2017']
(p | Create(elements)
| beam.ParDo(generate_kvs)
| beam.combiners.Count.PerKey()
| "Format" >> Map(lambda x: f"{x[0]}, {x[1]}")
| Map(print))
p.run()
无论元素在工人之间的分布如何,这都会输出您想要的结果。
注意 Apache Beam 的想法是尽可能并行化,为了聚合,您需要组合器
我建议您检查一些 wordcounts examples 以便您掌握组合器的窍门
编辑
关于组合器的说明:
ParDo 是一个在元素到元素基础上发生的操作。它获取一个元素,进行一些操作并将输出发送到下一个 PTransform。当您需要聚合数据(计数元素、求和值、连接句子...)时,元素明智的操作不起作用,您需要采用 PCollection 的东西(即,许多具有逻辑的元素)并输出一些东西。这就是组合器发挥作用的地方,它们以 PCollection 为基础执行操作,这可以跨工作人员进行(Map-Reduce 操作的一部分)
在您的示例中,您使用 Class 参数将计数存储在 ParDo 中,因此当元素经过它时,它会更改 class 中的参数。当所有元素都经过同一个 worker 时,这会起作用,因为 Class 是在 worker 基础上“创建”的(即,它们不共享状态),但是当有更多 worker 时,计数(与ParDo) 将分别发生在每个工人身上
这是我的输入数据。
ㅡ.Input(本地)
'Iot,c c++ python,2015',
'Web,java spring,2016',
'Iot,c c++ spring,2017',
'Iot,c c++ spring,2017',
这是在本地环境中 运行ning apache-beam 的结果。
ㅡ.Outout(本地)
Iot,2015,c,1
Iot,2015,c++,1
Iot,2015,python,1
Iot,2017,c,2
Iot,2017,c++,2
Iot,2017,spring,2
Web,2016,java,1
Web,2016,spring,1
但是,当我运行 google-cloud-platform 数据流并将其放入桶中时,结果不同。
ㅡ.存储(桶)
Web,2016,java,1
Web,2016,spring,1
Iot,2015,c,1
Iot,2015,c++,1
Iot,2015,python,1
Iot,2017,c,1
Iot,2017,c++,1
Iot,2017,spring,1
Iot,2017,c,1
Iot,2017,c++,1
Iot,2017,spring,1
这是我的代码。
ㅡ.代码
#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
pipeline_options = PipelineOptions(
project='project-id',
runner='dataflow',
temp_location='bucket-location'
)
def pardo_dofn_methods(test=None):
import apache_beam as beam
class split_category_advanced(beam.DoFn):
def __init__(self, delimiter=','):
self.delimiter = delimiter
self.k = 1
self.pre_processing = []
self.window = beam.window.GlobalWindow()
self.year_dict = {}
self.category_index = 0
self.language_index = 1
self.year_index = 2;
self.result = []
def setup(self):
print('setup')
def start_bundle(self):
print('start_bundle')
def finish_bundle(self):
print('finish_bundle')
for ppc_index in range(len(self.pre_processing)) :
if self.category_index == 0 or self.category_index%3 == 0 :
if self.pre_processing[self.category_index] not in self.year_dict :
self.year_dict[self.pre_processing[self.category_index]] = {}
if ppc_index + 2 == 2 or ppc_index + 2 == self.year_index :
# { category : { year : {} } }
if self.pre_processing[self.year_index] not in self.year_dict[self.pre_processing[self.category_index]] :
self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] = {}
# { category : { year : c : { }, c++ : { }, java : { }}}
language = self.pre_processing[self.year_index-1].split(' ')
for lang_index in range(len(language)) :
if language[lang_index] not in self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] :
self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][language[lang_index]] = 1
else :
self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][
language[lang_index]] += 1
self.year_index = self.year_index + 3
self.category_index = self.category_index + 1
csvFormat = ''
for category, nested in self.year_dict.items() :
for year in nested :
for language in nested[year] :
csvFormat+= (category+","+str(year)+","+language+","+str(nested[year][language]))+"\n"
print(csvFormat)
yield beam.utils.windowed_value.WindowedValue(
value=csvFormat,
#value = self.pre_processing,
timestamp=0,
windows=[self.window],
)
def process(self, text):
for word in text.split(self.delimiter):
self.pre_processing.append(word)
print(self.pre_processing)
#with beam.Pipeline(options=pipeline_options) as pipeline:
with beam.Pipeline() as pipeline:
results = (
pipeline
| 'Gardening plants' >> beam.Create([
'Iot,c c++ python,2015',
'Web,java spring,2016',
'Iot,c c++ spring,2017',
'Iot,c c++ spring,2017',
])
| 'Split category advanced' >> beam.ParDo(split_category_advanced(','))
| 'Save' >> beam.io.textio.WriteToText("bucket-location")
| beam.Map(print) \
)
if test:
return test(results)
if __name__ == '__main__':
pardo_dofn_methods_basic()
执行简单字数统计的代码。 CSV 列有一个 [ category, year, language, count ] 例如) IoT, 2015, c, 1
感谢您阅读。
您得到不同输出的最可能原因是并行性。使用 DataflowRunner
时,操作 运行 尽可能并行。由于您正在使用 ParDo 进行计数,因此当元素 Iot,c c++ spring,2017
转到两个不同的 worker 时,计数不会按您希望的那样发生(您正在 ParDo 中计数)。
您需要使用组合器 (4.2.4)
这里有一个简单的例子说明你想做什么:
def generate_kvs(element, csv_delimiter=',', field_delimiter=' '):
splitted = element.split(csv_delimiter)
fields = splitted[1].split(field_delimiter)
# final key to count is (Source, year, language)
return [(f"{splitted[0]}, {splitted[2]}, {x}", 1) for x in fields]
p = beam.Pipeline()
elements = ['Iot,c c++ python,2015',
'Web,java spring,2016',
'Iot,c c++ spring,2017',
'Iot,c c++ spring,2017']
(p | Create(elements)
| beam.ParDo(generate_kvs)
| beam.combiners.Count.PerKey()
| "Format" >> Map(lambda x: f"{x[0]}, {x[1]}")
| Map(print))
p.run()
无论元素在工人之间的分布如何,这都会输出您想要的结果。
注意 Apache Beam 的想法是尽可能并行化,为了聚合,您需要组合器
我建议您检查一些 wordcounts examples 以便您掌握组合器的窍门
编辑
关于组合器的说明:
ParDo 是一个在元素到元素基础上发生的操作。它获取一个元素,进行一些操作并将输出发送到下一个 PTransform。当您需要聚合数据(计数元素、求和值、连接句子...)时,元素明智的操作不起作用,您需要采用 PCollection 的东西(即,许多具有逻辑的元素)并输出一些东西。这就是组合器发挥作用的地方,它们以 PCollection 为基础执行操作,这可以跨工作人员进行(Map-Reduce 操作的一部分)
在您的示例中,您使用 Class 参数将计数存储在 ParDo 中,因此当元素经过它时,它会更改 class 中的参数。当所有元素都经过同一个 worker 时,这会起作用,因为 Class 是在 worker 基础上“创建”的(即,它们不共享状态),但是当有更多 worker 时,计数(与ParDo) 将分别发生在每个工人身上