如何 return 元组而不是联合
How to return a Tuple instead of a Union
我正在尝试编写一些基本的 apache_beam
管道用于学习目的,但我找不到我的转换类型之一返回为 Union[int,str]
而不是 [=16 的原因=].为什么会这样?
错误发生在管道中的 Format
步骤之前。
我正在使用类型提示,所以我得到的错误是:
Exception has occurred: TypeCheckError
Type hint violation for 'Format': requires Tuple[str, int] but got Union[int, str] for element
当我通过临时删除类型提示进行调试时,我发现 element
确实是 Union[int,str]
类型,但我无法解释为什么。
流水线:
def run(argv=None):
"""Main entry point; runs a word_count pipeline"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",
dest="input",
default="data/input/count_words.txt",
help="Input file to process",
)
parser.add_argument(
"--output",
dest="output",
default="data/output/filtered_output_count_words.txt",
help="Processed output file",
)
parser.add_argument(
"--runner",
dest="runner",
default="DirectRunner",
help="Runner used to process the apache beam pipeline",
)
args = parser.parse_known_args(argv)[0]
beam_options = PipelineOptions(runner=args.runner)
with beam.Pipeline(options=beam_options) as pipeline:
filtered_words = (
pipeline
| "Read" >> beam.io.ReadFromText(args.input)
| "CountWords" >> CountWords()
| "Filter" >> beam.ParDo(FilterWords(pattern="I'm|trying|example"))
)
assert_that(
filtered_words,
equal_to([("I'm", 4), ("trying", 4), ("example", 4)]),
)
(
filtered_words
# | "Print" >> beam.Map(print)
| "Format" >> beam.ParDo(FormatCountText())
管道的其他元素:
class FilterWords(beam.DoFn):
def __init__(self, pattern: str):
"""ParDo to filter a bunch o' words & their number of occurrences
:param pattern: regexp pattern to use for filtering
:type pattern: str
"""
super()
self.pattern = pattern
def process(
self,
element: Tuple[str, int],
# words_to_keep: List[str],
) -> Tuple[str, int]:
word, _ = element
if re.match(self.pattern, word):
logging.info(
f"The word '{word}' matches the pattern {self.pattern}"
)
yield element
# yield Tuple(word, _)
else:
logging.debug(
f"The word '{word}' does not match the pattern {self.pattern}"
)
class CountWords(beam.PTransform):
def expand(
self,
pcoll: beam.PCollection,
) -> beam.PCollection:
return (
pcoll
| "Extract" >> beam.ParDo(ExtractWordsFromRow(), delimiter=" ")
| "Unpuncutate"
>> beam.ParDo(RemovePunctuation(), symbols=[",", "."])
| "Count" >> beam.combiners.Count.PerElement()
)
class ExtractWordsFromRow(beam.DoFn):
def process(
self,
element: str,
delimiter: str,
) -> List[str]:
# Extract items within element, in this case 1 line into multiple words
words = str(element).split(delimiter)
return words
class RemovePunctuation(beam.DoFn):
def process(
self,
element: str,
symbols: List[str],
) -> Iterable[str]:
word = element
for symbol in symbols:
word = word.replace(symbol, "")
yield word
class FormatCountText(beam.DoFn):
def process(
self,
element: Tuple[str, int],
):
word, count = element
yield f"{word}: {count}"
这是因为在您的 FilterWords.process
声明中,您将其声明为 returning Tuple[str, int]
。 DoFn 的 process
方法应该 return 一个 Iterable[T]
来产生一个 PCollection[T]
,所以它应该声明为 returning -> Iterable[Tuple[str, int]]
(由于对于 yield 语句,process
的 return 值实际上是一个生成器)。
(Union[str, int]
来自这样一个事实,即如果您实际上 return 编辑了一个 Tuple[str, int]
,则字符串和整数将分别添加到生成的 PCollection 中。)
我正在尝试编写一些基本的 apache_beam
管道用于学习目的,但我找不到我的转换类型之一返回为 Union[int,str]
而不是 [=16 的原因=].为什么会这样?
错误发生在管道中的 Format
步骤之前。
我正在使用类型提示,所以我得到的错误是:
Exception has occurred: TypeCheckError
Type hint violation for 'Format': requires Tuple[str, int] but got Union[int, str] for element
当我通过临时删除类型提示进行调试时,我发现 element
确实是 Union[int,str]
类型,但我无法解释为什么。
流水线:
def run(argv=None):
"""Main entry point; runs a word_count pipeline"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",
dest="input",
default="data/input/count_words.txt",
help="Input file to process",
)
parser.add_argument(
"--output",
dest="output",
default="data/output/filtered_output_count_words.txt",
help="Processed output file",
)
parser.add_argument(
"--runner",
dest="runner",
default="DirectRunner",
help="Runner used to process the apache beam pipeline",
)
args = parser.parse_known_args(argv)[0]
beam_options = PipelineOptions(runner=args.runner)
with beam.Pipeline(options=beam_options) as pipeline:
filtered_words = (
pipeline
| "Read" >> beam.io.ReadFromText(args.input)
| "CountWords" >> CountWords()
| "Filter" >> beam.ParDo(FilterWords(pattern="I'm|trying|example"))
)
assert_that(
filtered_words,
equal_to([("I'm", 4), ("trying", 4), ("example", 4)]),
)
(
filtered_words
# | "Print" >> beam.Map(print)
| "Format" >> beam.ParDo(FormatCountText())
管道的其他元素:
class FilterWords(beam.DoFn):
def __init__(self, pattern: str):
"""ParDo to filter a bunch o' words & their number of occurrences
:param pattern: regexp pattern to use for filtering
:type pattern: str
"""
super()
self.pattern = pattern
def process(
self,
element: Tuple[str, int],
# words_to_keep: List[str],
) -> Tuple[str, int]:
word, _ = element
if re.match(self.pattern, word):
logging.info(
f"The word '{word}' matches the pattern {self.pattern}"
)
yield element
# yield Tuple(word, _)
else:
logging.debug(
f"The word '{word}' does not match the pattern {self.pattern}"
)
class CountWords(beam.PTransform):
def expand(
self,
pcoll: beam.PCollection,
) -> beam.PCollection:
return (
pcoll
| "Extract" >> beam.ParDo(ExtractWordsFromRow(), delimiter=" ")
| "Unpuncutate"
>> beam.ParDo(RemovePunctuation(), symbols=[",", "."])
| "Count" >> beam.combiners.Count.PerElement()
)
class ExtractWordsFromRow(beam.DoFn):
def process(
self,
element: str,
delimiter: str,
) -> List[str]:
# Extract items within element, in this case 1 line into multiple words
words = str(element).split(delimiter)
return words
class RemovePunctuation(beam.DoFn):
def process(
self,
element: str,
symbols: List[str],
) -> Iterable[str]:
word = element
for symbol in symbols:
word = word.replace(symbol, "")
yield word
class FormatCountText(beam.DoFn):
def process(
self,
element: Tuple[str, int],
):
word, count = element
yield f"{word}: {count}"
这是因为在您的 FilterWords.process
声明中,您将其声明为 returning Tuple[str, int]
。 DoFn 的 process
方法应该 return 一个 Iterable[T]
来产生一个 PCollection[T]
,所以它应该声明为 returning -> Iterable[Tuple[str, int]]
(由于对于 yield 语句,process
的 return 值实际上是一个生成器)。
(Union[str, int]
来自这样一个事实,即如果您实际上 return 编辑了一个 Tuple[str, int]
,则字符串和整数将分别添加到生成的 PCollection 中。)