从 PCollection 中提取常量值
Extract constant value from a PCollection
我有一个连接器,可以为我提供数百万个条目。其中一些条目有一个字段 date
,其值对所有条目都相同,但其他条目则不同。我需要获取此值并将其设置到管道中的每个条目。
如果不是分布式管道,我会这样做:
def set_date(entries):
for entry in entries:
mydate = entry.get('date')
if mydate:
break
for entry in entries:
entry['date'] = mydate
但是,我必须单独对待每个条目而不是作为一个集合,所以我不能这样写逻辑。
此外,我们不能保证每一组 PCollection 至少有一个条目具有所需的 date
字段。这不太可能,但仍然可能发生 PCollection 上的所有条目都没有 date
字段,所以我上面写的逻辑是无效的。我需要编写某种 PTransform 将 date
值保存到 class 属性,并在某种类型的“finally”子句上设置每个条目的值。
正确的实现方式是什么?
好问题!对于批处理管道中的 Beam,您可以使用 侧输入 来完成此操作。你会做这样的事情:
def get_date(entry):
if 'date' in entry:
yield entry['date']
def put_date(entry, date):
entry['date'] = date[0]
return entry
with beam.Pipeline(...) as p:
entries = p | ReadEntries(....)
# First obtain the date from the entires.
# We do this by outputting the date from the entries with a date,
# and then we 'sample' one (this is a simple combiner that lets us
# pick one of the dates - since they're all the same).
date_pc = beam.pvalue.AsList(
entries
| beam.FlatMap(get_date)
| beam.Combine(Sample.FixedSizeGlobally(1))
# We then use the date_pc side input to join it into entries.
dated_entries = entries | beam.Map(put_date, date_pc)
如果有帮助请告诉我。
如果 none 个条目具有 date
属性,那么我们可能需要做一些其他事情。很高兴迭代!
据我所知,我想在 运行 我的管道之前为我的条目设置 date
值,我已将其添加为 ValueProvier
并将其提供给我的 SetDate
PTransform 作为辅助输入。如果不是你的情况,你可以试试标记为正确的答案。
这是我的最终实现:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--query',
help='Query to retrieve from BigQuery acting as data source.')
parser.add_value_provider_argument(
'--mydate',
help='Date with %Y-%m-%d format of the entries we are processing.')
options = PipelineOptions()
args = options.view_as(CustomOptions)
def run():
with beam.Pipeline(options=options) as p:
(
p
| "Read from BigQuery" >> beam.io.ReadFromBigQuery(
query=args.query,
use_standard_sql=True,
flatten_results=False)
| "Set date" >> beam.ParDo(SetDate(args.mydate))
)
class SetDate(beam.DoFn):
def __init__(self, mydate):
self.mydate = mydate
def process(self, entry):
if 'date' not in entry:
entry['date'] = self.mydate.get()
yield entry
我有一个连接器,可以为我提供数百万个条目。其中一些条目有一个字段 date
,其值对所有条目都相同,但其他条目则不同。我需要获取此值并将其设置到管道中的每个条目。
如果不是分布式管道,我会这样做:
def set_date(entries):
for entry in entries:
mydate = entry.get('date')
if mydate:
break
for entry in entries:
entry['date'] = mydate
但是,我必须单独对待每个条目而不是作为一个集合,所以我不能这样写逻辑。
此外,我们不能保证每一组 PCollection 至少有一个条目具有所需的 date
字段。这不太可能,但仍然可能发生 PCollection 上的所有条目都没有 date
字段,所以我上面写的逻辑是无效的。我需要编写某种 PTransform 将 date
值保存到 class 属性,并在某种类型的“finally”子句上设置每个条目的值。
正确的实现方式是什么?
好问题!对于批处理管道中的 Beam,您可以使用 侧输入 来完成此操作。你会做这样的事情:
def get_date(entry):
if 'date' in entry:
yield entry['date']
def put_date(entry, date):
entry['date'] = date[0]
return entry
with beam.Pipeline(...) as p:
entries = p | ReadEntries(....)
# First obtain the date from the entires.
# We do this by outputting the date from the entries with a date,
# and then we 'sample' one (this is a simple combiner that lets us
# pick one of the dates - since they're all the same).
date_pc = beam.pvalue.AsList(
entries
| beam.FlatMap(get_date)
| beam.Combine(Sample.FixedSizeGlobally(1))
# We then use the date_pc side input to join it into entries.
dated_entries = entries | beam.Map(put_date, date_pc)
如果有帮助请告诉我。
如果 none 个条目具有 date
属性,那么我们可能需要做一些其他事情。很高兴迭代!
据我所知,我想在 运行 我的管道之前为我的条目设置 date
值,我已将其添加为 ValueProvier
并将其提供给我的 SetDate
PTransform 作为辅助输入。如果不是你的情况,你可以试试标记为正确的答案。
这是我的最终实现:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--query',
help='Query to retrieve from BigQuery acting as data source.')
parser.add_value_provider_argument(
'--mydate',
help='Date with %Y-%m-%d format of the entries we are processing.')
options = PipelineOptions()
args = options.view_as(CustomOptions)
def run():
with beam.Pipeline(options=options) as p:
(
p
| "Read from BigQuery" >> beam.io.ReadFromBigQuery(
query=args.query,
use_standard_sql=True,
flatten_results=False)
| "Set date" >> beam.ParDo(SetDate(args.mydate))
)
class SetDate(beam.DoFn):
def __init__(self, mydate):
self.mydate = mydate
def process(self, entry):
if 'date' not in entry:
entry['date'] = self.mydate.get()
yield entry