输出类型中 beam.ParDo 和 beam.Map 的区别?
Difference between beam.ParDo and beam.Map in the output type?
我正在使用 Apache-Beam 运行 进行一些数据转换,包括从 txt、csv 和不同数据源中提取数据。
我注意到的一件事是使用 beam.Map 和 beam.ParDo
时的结果差异
在下一个示例中:
我正在读取 csv 数据,在第一种情况下,使用 beam.ParDo 将其传递给 DoFn,它提取第一个元素,即日期,然后打印它。
第二种情况,我直接用beam.Map做同样的事情,然后打印出来
class Printer(beam.DoFn):
def process(self,data_item):
print data_item
class DateExtractor(beam.DoFn):
def process(self,data_item):
return (str(data_item).split(','))[0]
data_from_source = (p
| 'ReadMyFile 01' >> ReadFromText('./input/data.csv')
| 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
| 'Printer the data 01' >> beam.ParDo(Printer())
)
copy_of_the_data = (p
| 'ReadMyFile 02' >> ReadFromText('./input/data.csv')
| 'Splitter using beam.Map 02' >> beam.Map(lambda record: (record.split(','))[0])
| 'Printer the data 02' >> beam.ParDo(Printer())
)
我在两个输出中注意到的是下一个:
##With beam.ParDo##
2
0
1
7
-
0
4
-
0
3
2
0
1
7
##With beam.Map##
2017-04-03
2017-04-03
2017-04-10
2017-04-10
2017-04-11
2017-04-12
2017-04-12
我觉得这很奇怪。我想知道是不是打印功能的问题?但是在使用不同的转换之后,它显示出相同的结果。
例如 运行ning:
| 'Group it 01' >> beam.Map(lambda record: (record, 1))
仍然返回同样的问题:
##With beam.ParDo##
('8', 1)
('2', 1)
('0', 1)
('1', 1)
##With beam.Map##
(u'2017-04-08', 1)
(u'2017-04-08', 1)
(u'2017-04-09', 1)
(u'2017-04-09', 1)
知道是什么原因吗? beam.Map 和 beam.ParDo 之间的区别我遗漏了什么???
简答
您需要将 ParDo
的 return 值包装到一个列表中。
加长版
ParDos
通常可以 return 单个输入的任意数量的输出,即对于单个输入字符串,您可以发出零个、一个或多个结果。出于这个原因,Beam SDK 将 ParDo
的输出视为不是单个元素,而是元素的集合。
在您的情况下,ParDo
发出单个字符串而不是集合。 Beam Python SDK 仍然尝试将 ParDo
的输出解释为元素集合。它通过将您发出的字符串解释为字符集合来实现。因此,您的 ParDo
现在可以有效地生成单个字符流,而不是字符串流。
您需要做的是将您的 return 值包装到一个列表中:
class DateExtractor(beam.DoFn):
def process(self,data_item):
return [(str(data_item).split(','))[0]]
注意方括号。有关更多示例,请参阅 programming guide。
另一方面,Map
可以认为是 ParDo
的特例。 Map
预计会为每个输入产生一个输出。所以在这种情况下,您可以 return lambda 中的单个值,它会按预期工作。
而且您可能不需要将 data_item
包裹在 str
中。 According to the docs ReadFromText
转换生成字符串。
我正在使用 Apache-Beam 运行 进行一些数据转换,包括从 txt、csv 和不同数据源中提取数据。 我注意到的一件事是使用 beam.Map 和 beam.ParDo
时的结果差异在下一个示例中:
我正在读取 csv 数据,在第一种情况下,使用 beam.ParDo 将其传递给 DoFn,它提取第一个元素,即日期,然后打印它。 第二种情况,我直接用beam.Map做同样的事情,然后打印出来
class Printer(beam.DoFn):
def process(self,data_item):
print data_item
class DateExtractor(beam.DoFn):
def process(self,data_item):
return (str(data_item).split(','))[0]
data_from_source = (p
| 'ReadMyFile 01' >> ReadFromText('./input/data.csv')
| 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
| 'Printer the data 01' >> beam.ParDo(Printer())
)
copy_of_the_data = (p
| 'ReadMyFile 02' >> ReadFromText('./input/data.csv')
| 'Splitter using beam.Map 02' >> beam.Map(lambda record: (record.split(','))[0])
| 'Printer the data 02' >> beam.ParDo(Printer())
)
我在两个输出中注意到的是下一个:
##With beam.ParDo##
2
0
1
7
-
0
4
-
0
3
2
0
1
7
##With beam.Map##
2017-04-03
2017-04-03
2017-04-10
2017-04-10
2017-04-11
2017-04-12
2017-04-12
我觉得这很奇怪。我想知道是不是打印功能的问题?但是在使用不同的转换之后,它显示出相同的结果。 例如 运行ning:
| 'Group it 01' >> beam.Map(lambda record: (record, 1))
仍然返回同样的问题:
##With beam.ParDo##
('8', 1)
('2', 1)
('0', 1)
('1', 1)
##With beam.Map##
(u'2017-04-08', 1)
(u'2017-04-08', 1)
(u'2017-04-09', 1)
(u'2017-04-09', 1)
知道是什么原因吗? beam.Map 和 beam.ParDo 之间的区别我遗漏了什么???
简答
您需要将 ParDo
的 return 值包装到一个列表中。
加长版
ParDos
通常可以 return 单个输入的任意数量的输出,即对于单个输入字符串,您可以发出零个、一个或多个结果。出于这个原因,Beam SDK 将 ParDo
的输出视为不是单个元素,而是元素的集合。
在您的情况下,ParDo
发出单个字符串而不是集合。 Beam Python SDK 仍然尝试将 ParDo
的输出解释为元素集合。它通过将您发出的字符串解释为字符集合来实现。因此,您的 ParDo
现在可以有效地生成单个字符流,而不是字符串流。
您需要做的是将您的 return 值包装到一个列表中:
class DateExtractor(beam.DoFn):
def process(self,data_item):
return [(str(data_item).split(','))[0]]
注意方括号。有关更多示例,请参阅 programming guide。
另一方面,Map
可以认为是 ParDo
的特例。 Map
预计会为每个输入产生一个输出。所以在这种情况下,您可以 return lambda 中的单个值,它会按预期工作。
而且您可能不需要将 data_item
包裹在 str
中。 According to the docs ReadFromText
转换生成字符串。