如何 return 元组而不是联合

How to return a Tuple instead of a Union

我正在尝试编写一些基本的 apache_beam 管道用于学习目的,但我找不到我的转换类型之一返回为 Union[int,str] 而不是 [=16 的原因=].为什么会这样?

错误发生在管道中的 Format 步骤之前。 我正在使用类型提示,所以我得到的错误是:

Exception has occurred: TypeCheckError
Type hint violation for 'Format': requires Tuple[str, int] but got Union[int, str] for element

当我通过临时删除类型提示进行调试时,我发现 element 确实是 Union[int,str] 类型,但我无法解释为什么。

流水线:

def run(argv=None):
    """Main entry point; runs a word_count pipeline"""

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input",
        dest="input",
        default="data/input/count_words.txt",
        help="Input file to process",
    )
    parser.add_argument(
        "--output",
        dest="output",
        default="data/output/filtered_output_count_words.txt",
        help="Processed output file",
    )
    parser.add_argument(
        "--runner",
        dest="runner",
        default="DirectRunner",
        help="Runner used to process the apache beam pipeline",
    )
    args = parser.parse_known_args(argv)[0]
    beam_options = PipelineOptions(runner=args.runner)
    with beam.Pipeline(options=beam_options) as pipeline:
        filtered_words = (
            pipeline
            | "Read" >> beam.io.ReadFromText(args.input)
            | "CountWords" >> CountWords()
            | "Filter" >> beam.ParDo(FilterWords(pattern="I'm|trying|example"))
        )
        assert_that(
            filtered_words,
            equal_to([("I'm", 4), ("trying", 4), ("example", 4)]),
        )
        (
            filtered_words
            # | "Print" >> beam.Map(print)
            | "Format" >> beam.ParDo(FormatCountText())
            

管道的其他元素:

class FilterWords(beam.DoFn):
    def __init__(self, pattern: str):
        """ParDo to filter a bunch o' words & their number of occurrences

        :param pattern: regexp pattern to use for filtering
        :type pattern: str
        """
        super()
        self.pattern = pattern

    def process(
        self,
        element: Tuple[str, int],
        # words_to_keep: List[str],
    ) -> Tuple[str, int]:
        word, _ = element
        if re.match(self.pattern, word):
            logging.info(
                f"The word '{word}' matches the pattern {self.pattern}"
            )
            yield element
            # yield Tuple(word, _)
        else:
            logging.debug(
                f"The word '{word}' does not match the pattern {self.pattern}"
            )

class CountWords(beam.PTransform):
    def expand(
        self,
        pcoll: beam.PCollection,
    ) -> beam.PCollection:
        return (
            pcoll
            | "Extract" >> beam.ParDo(ExtractWordsFromRow(), delimiter=" ")
            | "Unpuncutate"
            >> beam.ParDo(RemovePunctuation(), symbols=[",", "."])
            | "Count" >> beam.combiners.Count.PerElement()
        )


class ExtractWordsFromRow(beam.DoFn):
    def process(
        self,
        element: str,
        delimiter: str,
    ) -> List[str]:
        # Extract items within element, in this case 1 line into multiple words
        words = str(element).split(delimiter)
        return words


class RemovePunctuation(beam.DoFn):
    def process(
        self,
        element: str,
        symbols: List[str],
    ) -> Iterable[str]:
        word = element
        for symbol in symbols:
            word = word.replace(symbol, "")
        yield word


class FormatCountText(beam.DoFn):
    def process(
        self,
        element: Tuple[str, int],
    ):
        word, count = element
        yield f"{word}: {count}"

这是因为在您的 FilterWords.process 声明中,您将其声明为 returning Tuple[str, int]。 DoFn 的 process 方法应该 return 一个 Iterable[T] 来产生一个 PCollection[T],所以它应该声明为 returning -> Iterable[Tuple[str, int]](由于对于 yield 语句,process 的 return 值实际上是一个生成器)。

Union[str, int] 来自这样一个事实,即如果您实际上 return 编辑了一个 Tuple[str, int],则字符串和整数将分别添加到生成的 PCollection 中。)