Python 中的数据流未显示来自 Pubsub 订阅的输出集合

Dataflow in Python not showing output collection from Pubsub subscription

我有一个用 Python 编写的数据流作业。很简单,只需要从订阅中读取,应用 Fixed Window 然后写入 GCS。

问题是从订阅中读取后,FixedWindow 不显示任何输出集合。

我一直在尝试任何事情,但都不走运。

这是我的代码

import apache_beam as beam
import argparse
import logging
import apache_beam.transforms.window as window
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions

def run(argv=None):

    """Main entry point; defines and runs the wordcount pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        required=True,
                        default='gs://dataflow-samples/shakespeare/kinglear.txt',
                        help='Input file to process.')
    parser.add_argument('--output',
                        dest='output',
                        required=True,
                        default='gs://dataflow-samples/',
                        help='Output file to write results to.')
    parser.add_argument('--topic',
                        dest='topic',
                        required=True,
                        help='Topic for message.')
    parser.add_argument('--subscription',
                        dest='subscription',
                        required=True,
                        help='Subscription for message.')
    parser.add_argument('--entity_type',
                        dest='entity_type',
                        required=True,
                        help='Entity Type for message.')
    parser.add_argument('--event_type',
                        dest='event_type',
                        required=True,
                        help='Event Type for message.')                        
    parser.add_argument('--outputFilenamePrefix',
                        dest='outputFilenamePrefix',
                        required=True,
                        help='Output Filename Prefix Type for message.')   
    parser.add_argument('--outputFilenameSuffix',
                        dest='outputFilenameSuffix',
                        required=True,
                        help='Output Filename Suffix Type for message.') 

    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming= True
    p = beam.Pipeline(options=pipeline_options)

    if known_args.subscription:
        messages = (p 
                | ReadFromPubSub(subscription=known_args.subscription, with_attributes=True))
    else:
        messages = (p 
                | ReadFromPubSub(subscription=known_args.topic, with_attributes=True))

    (messages 
            | beam.WindowInto(window.FixedWindows(120))
            | beam.io.WriteToText(known_args.output + known_args.outputFilenamePrefix, 
                                    file_name_suffix=known_args.outputFilenameSuffix,
                                    num_shards=1))

    result = p.run()
    result.wait_until_finish()

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

想法是将结果保存在提供的存储桶中。我一直在阅读,尚不支持无限数据中的 Window 等某些功能。在这种情况下,唯一的解决方案是使用 Java.

写入 GCS 时,python 不支持 WriteToText。正如您讨论的那样,这将在 Java 中起作用。或者您可以将记录写入单独的 IO,例如 BigQuery。

Supported IO Builts ins