先前的 PCollection 被 Pardo 更改,尽管如文档中所述不可变。疑似错误

Previous PCollection being altered by Pardo despite immutability as stated in docs. Suspected Bug

我目前正在编写流式管道以将数据插入 Bigtable,但我遇到了一个问题,我认为这是 Apache Beam 的一个错误,但我想听听一些意见。 https://beam.apache.org/documentation/programming-guide/ 在本文档中,它说 PCollection 是不可变的,但我发现了一个情况,由于分支点的 Pardo 函数导致 PCollection 意外变异,导致非常意外的错误,而且这些错误随机发生,而不是在所有数据条目上.

我已经在 Google Cloud Dataflow 上的部署、Google Cloud 上的 jupyter notebook 和我本地的机器上进行了测试,所有平台上都会出现错误。因此,它应该与核心库有关,但我不确定,因此我将其发布在这里以供人们智慧。

所以这里是重现问题的代码:

class CreateRawRow(beam.DoFn):

    def process(self, data):
        from google.cloud.bigtable.row import DirectRow
        import datetime

        # Convert str to datetime
        convertedTimestamp = datetime.datetime.strptime(data['Timestamp'], '%Y-%m-%d %H:%M:%S.%f%z')
        timestamp = convertedTimestamp.strftime("%Y%m%d%H%M%S")

        row_key = f"a#b#raw#c#{timestamp}"
        direct_row = DirectRow(row_key)
        for key in data:
            cf = "data"
            column = key
            value = str(data[key])
            direct_row.set_cell(
                cf,
                column,
                value,
                convertedTimestamp)

        return [direct_row]

p = beam.Pipeline(options=options)
rawdata = p | "Read" >> beam.io.ReadFromPubSub(subscription=subscription)
jsonData = (rawdata | "Parse Json" >> beam.Map(json.loads))

jsonData | "Create Row Information" >> beam.ParDo(CreateRawRow())
class ChangeTimestamp(beam.DoFn):
    def process(self, data):
        import datetime

        timestamp = datetime.datetime.strptime(data['Timestamp'], '%Y-%m-%d %H:%M:%S.%f%z')
        convertedTimestamp = timestamp.strftime("%Y-%m-%d %H:%M:%S")
        data["Timestamp"] = convertedTimestamp #Commenting out this line fixes the problem

        yield [data]
changedTimestamp = jsonData | "Change Time stamp" >> beam.ParDo(ChangeTimestamp())

result = p.run()
result.wait_until_finish()

上面的代码是精简的,以显示导致问题的确切代码行。 正如你在上面的代码中看到的,我已经从 Pub Sub 中摄取了(它必须从 pubsub 中摄取,如果我使用 beam.Create() 来模拟数据进来,就不会出现错误)。我使用 json.loads 从 pubsub 加载数据以将其转换为字典。从 pubsub 摄取后,我尝试使用 CreateRawRow() 创建一行以插入到 BigTable 中。我还创建了一个名为“更改时间戳”的单独分支。

现在,我得到的错误是

ValueError: time data '2021-02-13 20:12:23' does not match format '%Y-%m-%d %H:%M:%S.%f%z' [while running 'Create Row Information']

来自CreateRawRow

但是,我已将问题追溯到 ChangeTimestamp class,其中包含行“data["Timestamp"] = convertedTimestamp”。如果我注释掉这些行,错误就会消失。另外,如果我将时间格式更改为其他格式,错误也会随之而来,所以肯定是这一行是麻烦制造者。

所以我的直接怀疑是,由于此 Pardo 中的这一行,先前的“jsonData”Pcollection 发生了变异。但是,根据文档,这不应该是可能的。因此,我想提出一些意见,我这样做是错误的还是这是一个真正的错误?还要记住,PubSub 上的每个条目都不会发生这种情况,如果我使用 Beam.Create() 模拟数据流也不会发生这种情况。

关于 PCollection 不变性的声明是 DoFns 不应该改变他们的输入。在像 Python 这样的语言中,所有内容都通过引用传递并且默认情况下是可变的(没有 const),如果不是不可能的话,这很难执行。