如何优雅地中止 App Engine 管道?

How to abort App Engine pipelines gracefully?

问题

我有一个管道链:

class PipelineA(base_handler.PipelineBase):
  def run(self, *args):
    # do something

class PipelineB(base_handler.PipelineBase):
  def run(self, *args):
    # do something


class EntryPipeline(base_handler.PipelineBase):
  def run(self):

    if some_condition():
      self.abort("Condition failed. Pipeline aborted!")

    yield PipelineA()

    mr_output = yield mapreduce_pipeline.MapreducePipeline(
      # mapreduce configs here
      # ...
    )

    yield PipelineB(mr_output)

p = EntryPipeline()
p.start()

EntryPipeline 中,我在开始 PipelineAMapreducePipelinePipelineB 之前测试了一些条件。如果条件失败,我想中止 EntryPipeline 和所有后续管道。

问题

  1. 什么是优雅的流水线流产? self.abort() 是正确的方法还是我需要 sys.exit()?

  2. 如果我想在PipelineA里面堕胎怎么办?例如PipelineA 成功启动,但阻止后续管道(MapreducePipelinePipelineB)启动。


编辑:

我最终将条件语句移到了 EntryPipeline 之外,因此只有在条件为真时才开始整个过程​​。否则我认为尼克的回答是正确的。

由于文档目前说“TODO:谈论显式中止并重试

我们必须阅读来源:

https://github.com/GoogleCloudPlatform/appengine-pipelines/blob/master/python/src/pipeline/pipeline.py#L703

  def abort(self, abort_message=''):
    """Mark the entire pipeline up to the root as aborted.
    Note this should only be called from *outside* the context of a running
    pipeline. Synchronous and generator pipelines should raise the 'Abort'
    exception to cause this behavior during execution.

    Args:
      abort_message: Optional message explaining why the abort happened.

    Returns:
      True if the abort signal was sent successfully; False if the pipeline
      could not be aborted for any reason.
    """

因此,如果您有 some_pipeline 的句柄,那么 不是 self, 你可以调用 some_pipeline.abort()... 但是如果你想中止自己,你需要 raise Abort() ......这将冒泡到顶部并杀死整棵树