修改 MinimalWordCount 示例以从 BigQuery 读取
Modify MinimalWordCount example to read from BigQuery
我正在尝试修改 Apache Beam 的 MinimalWordCount python 示例以从 BigQuery table 读取。我进行了以下修改,我似乎可以使用查询但示例。
此处为原始示例:
with beam.Pipeline(options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)
# Count the occurrences of each word.
counts = (
lines
| 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)
而不是 ReadFromText
我正在尝试调整它以从 BigQuery table 中的列读取。为此,我将 lines = p | ReadFromText(known_args.input)
替换为以下代码:
query = 'SELECT text_column FROM `bigquery.table.goes.here` '
lines = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
当我重新运行管道时,我收到错误:“WARNING:root:A task failed with exception. expected string or buffer [while running 'Split']
”
我知道 'Split' 操作需要一个字符串,但显然没有得到字符串。我如何修改 'ReadFromBigQuery' 使其传递 string/buffer?我是否需要提供 table 模式或其他东西来将 'ReadFromBigQuery' 的结果转换为字符串缓冲区?
这是因为 BigQuerySource
returns PCollection
字典 (dict
),其中字典中的每个键代表一个列。对于您的情况,最简单的做法就是在 beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)
之后应用 beam.Map
,如下所示:
lines = (p
|"ReadFromBigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
| "Extract text column" >> beam.Map(lambda row: row.get("text_column"))
)
如果您遇到列名问题,请尝试将其更改为 u"text_column"
。
或者您可以修改 Split 转换以提取列的值:
'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x.get("text_column")))
.with_output_types(unicode))
我正在尝试修改 Apache Beam 的 MinimalWordCount python 示例以从 BigQuery table 读取。我进行了以下修改,我似乎可以使用查询但示例。
此处为原始示例:
with beam.Pipeline(options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)
# Count the occurrences of each word.
counts = (
lines
| 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)
而不是 ReadFromText
我正在尝试调整它以从 BigQuery table 中的列读取。为此,我将 lines = p | ReadFromText(known_args.input)
替换为以下代码:
query = 'SELECT text_column FROM `bigquery.table.goes.here` '
lines = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
当我重新运行管道时,我收到错误:“WARNING:root:A task failed with exception. expected string or buffer [while running 'Split']
”
我知道 'Split' 操作需要一个字符串,但显然没有得到字符串。我如何修改 'ReadFromBigQuery' 使其传递 string/buffer?我是否需要提供 table 模式或其他东西来将 'ReadFromBigQuery' 的结果转换为字符串缓冲区?
这是因为 BigQuerySource
returns PCollection
字典 (dict
),其中字典中的每个键代表一个列。对于您的情况,最简单的做法就是在 beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)
之后应用 beam.Map
,如下所示:
lines = (p
|"ReadFromBigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
| "Extract text column" >> beam.Map(lambda row: row.get("text_column"))
)
如果您遇到列名问题,请尝试将其更改为 u"text_column"
。
或者您可以修改 Split 转换以提取列的值:
'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x.get("text_column")))
.with_output_types(unicode))