从 Apache Beam 管道收集输出并将其显示到控制台

Collecting output from Apache Beam pipeline and displaying it to console

我已经在 Apache Beam 上工作了几天。我想快速迭代我正在工作的应用程序,并确保我正在构建的管道没有错误。在 spark 中,我们可以使用 sc.parallelise ,当我们应用一些操作时,我们会得到我们可以检查的值。

类似地,当我阅读 Apache Beam 时,我发现我们可以创建一个 PCollection 并使用以下语法使用它

with beam.Pipeline() as pipeline:
    lines = pipeline | beam.Create(["this is test", "this is another test"])
    word_count = (lines 
                  | "Word" >> beam.ParDo(lambda line: line.split(" "))
                  | "Pair of One" >> beam.Map(lambda w: (w, 1))
                  | "Group" >> beam.GroupByKey()
                  | "Count" >> beam.Map(lambda (w, o): (w, sum(o))))
    result = pipeline.run()

我其实是想把结果打印到控制台。但是我找不到任何关于它的文档。

Is there a way to print the result to console instead of saving it to a file each time?

在进一步探索并了解如何为我的应用程序编写测试用例之后,我找到了将结果打印到控制台的方法。请注意,我现在 运行 将所有内容都放到一台单节点机器上,并试图了解 apache beam 提供的功能以及我如何在不影响行业最佳实践的情况下采用它。

所以,这是我的解决方案。在管道的最后阶段,我们可以引入一个映射函数,它将结果打印到控制台或将结果累积到一个变量中,稍后我们可以打印变量以查看值

import apache_beam as beam

# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]

# create a pipeline
pipeline = beam.Pipeline()
counts = (pipeline | "create" >> beam.Create(data)
    | "split" >> beam.ParDo(lambda row: row.split(" "))
    | "pair" >> beam.Map(lambda w: (w, 1))
    | "group" >> beam.CombinePerKey(sum))

# lets collect our result with a map transformation into output array
output = []
def collect(row):
    output.append(row)
    return True

counts | "print" >> beam.Map(collect)

# Run the pipeline
result = pipeline.run()

# lets wait until result a available
result.wait_until_finish()

# print the output
print output

我知道这不是您要的,但您为什么不将它存储到文本文件中呢?它总是比通过 stdout 打印它更好,而且它不是易变的

您不需要临时列表。在 python 2.7 中,以下内容应该足够了:

def print_row(row):
    print row

(pipeline 
    | ...
    | "print" >> beam.Map(print_row)
)

result = pipeline.run()
result.wait_until_finish()

在 python 3.x 中,print 是一个函数,因此以下内容就足够了:

(pipeline 
    | ...
    | "print" >> beam.Map(print)
)

result = pipeline.run()
result.wait_until_finish()

学习来自 pycharm Edu

的例子
import apache_beam as beam

class LogElements(beam.PTransform):
    class _LoggingFn(beam.DoFn):

        def __init__(self, prefix=''):
            super(LogElements._LoggingFn, self).__init__()
            self.prefix = prefix

        def process(self, element, **kwargs):
            print self.prefix + str(element)
            yield element

    def __init__(self, label=None, prefix=''):
        super(LogElements, self).__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._LoggingFn(self.prefix))

class MultiplyByTenDoFn(beam.DoFn):

    def process(self, element):
        yield element * 10

p = beam.Pipeline()

(p | beam.Create([1, 2, 3, 4, 5])
   | beam.ParDo(MultiplyByTenDoFn())
   | LogElements())

p.run()

输出

10
20
30
40
50
Out[10]: <apache_beam.runners.portability.fn_api_runner.RunnerResult at 0x7ff41418a210>

也许记录信息而不是打印?

def _logging(elem):
    logging.info(elem)
    return elem

P | "logging info" >> beam.Map(_logging)