先前的 PCollection 被 Pardo 更改,尽管如文档中所述不可变。疑似错误
Previous PCollection being altered by Pardo despite immutability as stated in docs. Suspected Bug
我目前正在编写流式管道以将数据插入 Bigtable,但我遇到了一个问题,我认为这是 Apache Beam 的一个错误,但我想听听一些意见。
https://beam.apache.org/documentation/programming-guide/
在本文档中,它说 PCollection 是不可变的,但我发现了一个情况,由于分支点的 Pardo 函数导致 PCollection 意外变异,导致非常意外的错误,而且这些错误随机发生,而不是在所有数据条目上.
我已经在 Google Cloud Dataflow 上的部署、Google Cloud 上的 jupyter notebook 和我本地的机器上进行了测试,所有平台上都会出现错误。因此,它应该与核心库有关,但我不确定,因此我将其发布在这里以供人们智慧。
所以这里是重现问题的代码:
class CreateRawRow(beam.DoFn):
def process(self, data):
from google.cloud.bigtable.row import DirectRow
import datetime
# Convert str to datetime
convertedTimestamp = datetime.datetime.strptime(data['Timestamp'], '%Y-%m-%d %H:%M:%S.%f%z')
timestamp = convertedTimestamp.strftime("%Y%m%d%H%M%S")
row_key = f"a#b#raw#c#{timestamp}"
direct_row = DirectRow(row_key)
for key in data:
cf = "data"
column = key
value = str(data[key])
direct_row.set_cell(
cf,
column,
value,
convertedTimestamp)
return [direct_row]
p = beam.Pipeline(options=options)
rawdata = p | "Read" >> beam.io.ReadFromPubSub(subscription=subscription)
jsonData = (rawdata | "Parse Json" >> beam.Map(json.loads))
jsonData | "Create Row Information" >> beam.ParDo(CreateRawRow())
class ChangeTimestamp(beam.DoFn):
def process(self, data):
import datetime
timestamp = datetime.datetime.strptime(data['Timestamp'], '%Y-%m-%d %H:%M:%S.%f%z')
convertedTimestamp = timestamp.strftime("%Y-%m-%d %H:%M:%S")
data["Timestamp"] = convertedTimestamp #Commenting out this line fixes the problem
yield [data]
changedTimestamp = jsonData | "Change Time stamp" >> beam.ParDo(ChangeTimestamp())
result = p.run()
result.wait_until_finish()
上面的代码是精简的,以显示导致问题的确切代码行。
正如你在上面的代码中看到的,我已经从 Pub Sub 中摄取了(它必须从 pubsub 中摄取,如果我使用 beam.Create() 来模拟数据进来,就不会出现错误)。我使用 json.loads 从 pubsub 加载数据以将其转换为字典。从 pubsub 摄取后,我尝试使用 CreateRawRow() 创建一行以插入到 BigTable 中。我还创建了一个名为“更改时间戳”的单独分支。
现在,我得到的错误是
ValueError: time data '2021-02-13 20:12:23' does not match format '%Y-%m-%d %H:%M:%S.%f%z' [while running 'Create Row Information']
来自CreateRawRow
但是,我已将问题追溯到 ChangeTimestamp class,其中包含行“data["Timestamp"] = convertedTimestamp”。如果我注释掉这些行,错误就会消失。另外,如果我将时间格式更改为其他格式,错误也会随之而来,所以肯定是这一行是麻烦制造者。
所以我的直接怀疑是,由于此 Pardo 中的这一行,先前的“jsonData”Pcollection 发生了变异。但是,根据文档,这不应该是可能的。因此,我想提出一些意见,我这样做是错误的还是这是一个真正的错误?还要记住,PubSub 上的每个条目都不会发生这种情况,如果我使用 Beam.Create() 模拟数据流也不会发生这种情况。
关于 PCollection 不变性的声明是 DoFns 不应该改变他们的输入。在像 Python 这样的语言中,所有内容都通过引用传递并且默认情况下是可变的(没有 const
),如果不是不可能的话,这很难执行。
我目前正在编写流式管道以将数据插入 Bigtable,但我遇到了一个问题,我认为这是 Apache Beam 的一个错误,但我想听听一些意见。 https://beam.apache.org/documentation/programming-guide/ 在本文档中,它说 PCollection 是不可变的,但我发现了一个情况,由于分支点的 Pardo 函数导致 PCollection 意外变异,导致非常意外的错误,而且这些错误随机发生,而不是在所有数据条目上.
我已经在 Google Cloud Dataflow 上的部署、Google Cloud 上的 jupyter notebook 和我本地的机器上进行了测试,所有平台上都会出现错误。因此,它应该与核心库有关,但我不确定,因此我将其发布在这里以供人们智慧。
所以这里是重现问题的代码:
class CreateRawRow(beam.DoFn):
def process(self, data):
from google.cloud.bigtable.row import DirectRow
import datetime
# Convert str to datetime
convertedTimestamp = datetime.datetime.strptime(data['Timestamp'], '%Y-%m-%d %H:%M:%S.%f%z')
timestamp = convertedTimestamp.strftime("%Y%m%d%H%M%S")
row_key = f"a#b#raw#c#{timestamp}"
direct_row = DirectRow(row_key)
for key in data:
cf = "data"
column = key
value = str(data[key])
direct_row.set_cell(
cf,
column,
value,
convertedTimestamp)
return [direct_row]
p = beam.Pipeline(options=options)
rawdata = p | "Read" >> beam.io.ReadFromPubSub(subscription=subscription)
jsonData = (rawdata | "Parse Json" >> beam.Map(json.loads))
jsonData | "Create Row Information" >> beam.ParDo(CreateRawRow())
class ChangeTimestamp(beam.DoFn):
def process(self, data):
import datetime
timestamp = datetime.datetime.strptime(data['Timestamp'], '%Y-%m-%d %H:%M:%S.%f%z')
convertedTimestamp = timestamp.strftime("%Y-%m-%d %H:%M:%S")
data["Timestamp"] = convertedTimestamp #Commenting out this line fixes the problem
yield [data]
changedTimestamp = jsonData | "Change Time stamp" >> beam.ParDo(ChangeTimestamp())
result = p.run()
result.wait_until_finish()
上面的代码是精简的,以显示导致问题的确切代码行。 正如你在上面的代码中看到的,我已经从 Pub Sub 中摄取了(它必须从 pubsub 中摄取,如果我使用 beam.Create() 来模拟数据进来,就不会出现错误)。我使用 json.loads 从 pubsub 加载数据以将其转换为字典。从 pubsub 摄取后,我尝试使用 CreateRawRow() 创建一行以插入到 BigTable 中。我还创建了一个名为“更改时间戳”的单独分支。
现在,我得到的错误是
ValueError: time data '2021-02-13 20:12:23' does not match format '%Y-%m-%d %H:%M:%S.%f%z' [while running 'Create Row Information']
来自CreateRawRow
但是,我已将问题追溯到 ChangeTimestamp class,其中包含行“data["Timestamp"] = convertedTimestamp”。如果我注释掉这些行,错误就会消失。另外,如果我将时间格式更改为其他格式,错误也会随之而来,所以肯定是这一行是麻烦制造者。
所以我的直接怀疑是,由于此 Pardo 中的这一行,先前的“jsonData”Pcollection 发生了变异。但是,根据文档,这不应该是可能的。因此,我想提出一些意见,我这样做是错误的还是这是一个真正的错误?还要记住,PubSub 上的每个条目都不会发生这种情况,如果我使用 Beam.Create() 模拟数据流也不会发生这种情况。
关于 PCollection 不变性的声明是 DoFns 不应该改变他们的输入。在像 Python 这样的语言中,所有内容都通过引用传递并且默认情况下是可变的(没有 const
),如果不是不可能的话,这很难执行。