Apache Beam 中具有键值状态的状态处理
Stateful processing in Apache Beam with key-value states
我正在尝试使用 Apache Beam 实施有状态流程。我已经阅读了 Kenneth Knowles 的两篇文章 (Stateful processing with Apache Beam and Timely (and Stateful) Processing with Apache Beam),但没有找到解决我的问题的方法。我正在使用 Python SDK。
特别是,我正在尝试拥有一个包含键值对象的有状态 DoFn,我需要添加新元素,有时还需要删除一些元素。
我在我的 DoFn class 中看到了 a solution may be to use a SetStateSpec with Tuple coder。问题是 SetSpaceSpec 没有 'pop'-like 函数的选项。在我看来,删除元素的唯一方法是用 .clear()
将它们全部删除。
看来您不能仅指定要使用此函数擦除的元素。
解决这个问题的一个方法可能是在我需要删除状态中的元素时随时清除和重写状态,但这对我来说效率很低。
你知道如何有效地做到这一点吗?
Python 版本 3.8.7
apache-beam==2.29.0
我听从了@TudorPlugaru 的建议,然后得出了这个结论。希望对其他人有用。
import json
from apache_beam.coders import Coder
class MyDictCoder(Coder):
""" My custom dictionary coders """
def encode(self, o):
return json.dumps(o).encode()
def decode(self, o):
return json.loads(o.decode())
def is_deterministic(self) -> bool:
return True
在 DoFn 声明中
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
class MyDoFn(beam.DoFn):
DICTSTATE= ReadModifyWriteStateSpec(name='dictstate', coder=MyDictCoder())
def process(self, element, DictState=beam.DoFn.StateParam(DICTSTATE)):
# Do something
yield DictState
并在管道中添加这一行(如 Beam example 中所做的那样)
beam.coders.registry.register_coder(typing.Dict, MyDictCoder)
我正在尝试使用 Apache Beam 实施有状态流程。我已经阅读了 Kenneth Knowles 的两篇文章 (Stateful processing with Apache Beam and Timely (and Stateful) Processing with Apache Beam),但没有找到解决我的问题的方法。我正在使用 Python SDK。
特别是,我正在尝试拥有一个包含键值对象的有状态 DoFn,我需要添加新元素,有时还需要删除一些元素。
我在我的 DoFn class 中看到了 a solution may be to use a SetStateSpec with Tuple coder。问题是 SetSpaceSpec 没有 'pop'-like 函数的选项。在我看来,删除元素的唯一方法是用 .clear()
将它们全部删除。
看来您不能仅指定要使用此函数擦除的元素。
解决这个问题的一个方法可能是在我需要删除状态中的元素时随时清除和重写状态,但这对我来说效率很低。
你知道如何有效地做到这一点吗?
Python 版本 3.8.7
apache-beam==2.29.0
我听从了@TudorPlugaru 的建议,然后得出了这个结论。希望对其他人有用。
import json
from apache_beam.coders import Coder
class MyDictCoder(Coder):
""" My custom dictionary coders """
def encode(self, o):
return json.dumps(o).encode()
def decode(self, o):
return json.loads(o.decode())
def is_deterministic(self) -> bool:
return True
在 DoFn 声明中
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
class MyDoFn(beam.DoFn):
DICTSTATE= ReadModifyWriteStateSpec(name='dictstate', coder=MyDictCoder())
def process(self, element, DictState=beam.DoFn.StateParam(DICTSTATE)):
# Do something
yield DictState
并在管道中添加这一行(如 Beam example 中所做的那样)
beam.coders.registry.register_coder(typing.Dict, MyDictCoder)