如何从 PCollection 中过滤 None 个值
How To Filter None Values Out Of PCollection
我的 pubsub 拉取订阅正在发送消息和每条消息的 None 值。我需要找到一种方法来过滤掉 none 值作为我的管道处理的一部分
当然,一些帮助阻止 none 值从请求订阅到达的东西会很好。但我觉得我遗漏了一些关于通过 ParDo 定义和应用函数的一般工作流程。
我已经设置了一个函数来过滤掉 none 值,这似乎基于打印到控制台检查的工作,但是当应用在 none 类型上崩溃的 lambda 函数时,我仍然收到错误。
我发现 python Apache Beam SDK 的文档有点稀疏,但我一直在寻找答案,但运气不佳。
from __future__ import absolute_import
import argparse
import logging
from past.builtins import unicode
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.examples.wordcount import WordExtractingDoFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
def print_row(row):
print row
print type(row)
def filter_out_nones(row):
if row is not None:
yield row
else:
print 'we found a none! get it out'
def run(argv=None):
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
data = ['test1 message','test2 message',None,'test3 please work']
## this does seem to return only the values I would hope for based on the console log
testlogOnly = (p | "makeData" >> beam.Create(data)
| "filter" >> beam.ParDo(filter_out_nones)
| "printtesting" >> beam.Map(print_row))
# | 'encoding' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
# | "writing" >> beam.io.WriteToPubSub("projects/??/topics/??"))
## testlogAndWrite = (p | "MakeWriteData" >> beam.Create(data)
# | "filterHere" >> beam.ParDo(filter_out_nones)
# | "printHere" >> beam.Map(print_row)
## below here does not work due to the following message
## AttributeError: 'NoneType' object has no attribute 'encode' [while running 'encodeHere']
# | 'encodeHere' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
# | "writeTest" >> beam.io.WriteToPubSub("projects/??/topics/??"))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
如果我可以在没有 none 结果的情况下记录字节字符串编码的消息,我就会到达我需要去的地方。
我觉得你过滤掉 None
值的方法不错。
但是,如果我理解正确的话,当您使用 testlogAndWrite
并获得 AttributeError
时,您将在管道中保留 "printHere" >> beam.Map(print_row)
步骤。
print_row
读取消息并打印它们但不输出任何内容。因此,下一步将没有输入encode_here
。
要解决此问题,您可以注释掉该步骤或确保返回每个元素:
def print_row(row):
print row
print type(row)
return row
输出:
test1 message
<type 'str'>
test2 message
<type 'str'>
we found a none! get it out
test3 please work
<type 'str'>
我的 pubsub 拉取订阅正在发送消息和每条消息的 None 值。我需要找到一种方法来过滤掉 none 值作为我的管道处理的一部分
当然,一些帮助阻止 none 值从请求订阅到达的东西会很好。但我觉得我遗漏了一些关于通过 ParDo 定义和应用函数的一般工作流程。
我已经设置了一个函数来过滤掉 none 值,这似乎基于打印到控制台检查的工作,但是当应用在 none 类型上崩溃的 lambda 函数时,我仍然收到错误。
我发现 python Apache Beam SDK 的文档有点稀疏,但我一直在寻找答案,但运气不佳。
from __future__ import absolute_import
import argparse
import logging
from past.builtins import unicode
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.examples.wordcount import WordExtractingDoFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
def print_row(row):
print row
print type(row)
def filter_out_nones(row):
if row is not None:
yield row
else:
print 'we found a none! get it out'
def run(argv=None):
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
data = ['test1 message','test2 message',None,'test3 please work']
## this does seem to return only the values I would hope for based on the console log
testlogOnly = (p | "makeData" >> beam.Create(data)
| "filter" >> beam.ParDo(filter_out_nones)
| "printtesting" >> beam.Map(print_row))
# | 'encoding' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
# | "writing" >> beam.io.WriteToPubSub("projects/??/topics/??"))
## testlogAndWrite = (p | "MakeWriteData" >> beam.Create(data)
# | "filterHere" >> beam.ParDo(filter_out_nones)
# | "printHere" >> beam.Map(print_row)
## below here does not work due to the following message
## AttributeError: 'NoneType' object has no attribute 'encode' [while running 'encodeHere']
# | 'encodeHere' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
# | "writeTest" >> beam.io.WriteToPubSub("projects/??/topics/??"))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
如果我可以在没有 none 结果的情况下记录字节字符串编码的消息,我就会到达我需要去的地方。
我觉得你过滤掉 None
值的方法不错。
但是,如果我理解正确的话,当您使用 testlogAndWrite
并获得 AttributeError
时,您将在管道中保留 "printHere" >> beam.Map(print_row)
步骤。
print_row
读取消息并打印它们但不输出任何内容。因此,下一步将没有输入encode_here
。
要解决此问题,您可以注释掉该步骤或确保返回每个元素:
def print_row(row):
print row
print type(row)
return row
输出:
test1 message
<type 'str'>
test2 message
<type 'str'>
we found a none! get it out
test3 please work
<type 'str'>