Cloud Dataflow 流式传输,空闲时停止以省钱?

Cloud Dataflow streaming, stop when idle to save money?

我有一个应用程序,用户可以在其中投票。

我希望我的应用程序能够扩展,所以我决定使用 Cloud Dataflow 聚合存储在 Firestore 中的计数器。

我已经设置了一个 streaming 类型的数据流作业,这样我就可以在用户投票时监听 pubsub 主题。

有时我每天有数千个用户输入,有时我有几百个...有什么解决方案可以解决 "pause" 作业一段时间未收到 pubsub 消息的问题?

目前,我的数据流工作一直是运行,恐怕这会花费我很多钱。

如果有人可以帮助我了解流媒体工作的计费,我将不胜感激

这是我的 Python 管道:

def run(argv=None):
    # Config
    parser = argparse.ArgumentParser()
    # Output PubSub Topic
    parser.add_argument(
        '--output_topic', required=True)
    # Input PubSub Topic
    parser.add_argument(
        '--input_topic', required=True)

    known_args, pipeline_args = parser.parse_known_args(argv)

    # Pipeline options
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = True

    # Pipeline process
    with beam.Pipeline(options=pipeline_options) as p:

        # Counting votes
        def count_votes(contestant_votes):
            (contestant, votes) = contestant_votes
            return (contestant, sum(votes))

        # Format data to a fake object (used to be parsed by the CF)
        def format_result(contestant_votes):
            (contestant, votes) = contestant_votes
            return '{ "contestant": %s, "votes": %d }' % (contestant, votes)

        transformed = (p
                       | 'Receive PubSub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
                       .with_output_types(bytes)
                       | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
                       | 'Pair with one' >> beam.Map(lambda x: (x, 1))
                       | 'Apply window of time' >> beam.WindowInto(window.FixedWindows(30, 0))
                       | 'Group by contestant' >> beam.GroupByKey()
                       | 'Count votes' >> beam.Map(count_votes)
                       | 'Format to fake object string' >> beam.Map(format_result)
                       | 'Transform to PubSub base64 string' >> beam.Map(lambda x: x.encode('utf-8'))
                       .with_output_types(bytes))

        # Trigger a the output PubSub topic with the message payload
        transformed | beam.io.WriteToPubSub(known_args.output_topic)

        result = p.run()
        result.wait_until_finish()


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

回答您的成本问题:对于您当前使用的工作人员,您将花费大约 250 美元(取决于您当月的 PD 使用情况)。

目前没有等待强制数据流到 "idle" 或扩展到 0 个工作人员。您可以拥有的最小值是 1.

话虽这么说,但您可以采取几条路线来尽量降低成本。

如果您的工作器负载不大,并且您想要最简单的选项,则可以使用功能较弱的工作器(n1-standard-1 [~USD $77.06] 或 n1-standard-2 [~USD $137.17 ]). https://cloud.google.com/products/calculator/#id=3bbedf2f-8bfb-41db-9923-d3a5ef0c0250(如果你看到我添加了所有 3 个变体,使用我在你的照片中看到的 430GB PD)。

如果您需要计算能力,您可以切换到使用基于 cron 的数据流作业,如此处所述:https://cloud.google.com/blog/products/gcp/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions。有了这个,您可能应该从订阅而不是主题中阅读,这样您就可以保留消息直到您开始工作。