Beam - 模式不可知 ingestion/aggregation 可能吗?
Beam - Is schema-agnostic ingestion/aggregation possible?
我想摄取具有不断变化的模式(先验未知)的对象流(例如 JSON),并应用先验已知的自定义聚合。
在 Beam 中可以吗?
具体可以:
提取(嵌套)JSON 具有不断变化的架构的对象列表(在 PCollection 中):
msg1 = {"product":"apple","price":{"currency":"JPY","amount":50}}
msg2 = {"product":"apple","price":{"amount":70},"unuseful_field_for_this":"foo"}
在(全局和更新)时间 window(在 CombineFn 中)应用自定义聚合:
res = {"product":"apple","sales":120,"currency":"JPY"} <= Using JPY as default
下面的代码展示了通过使用元组(使用 CombinePerKey
提议的 here 函数)实现模式不可知的首次尝试,但并未展示“更丰富”的用例,因为以上一个。
注意:转换是由单独发送到 PubSub 的虚拟消息触发的。
代码
# Libraries
import random
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Generator function
def create_random_record(line):
n_fields = random.randint(1,5)
msg = {"key{}".format(i): i for i in range(1,n_fields)}
# One possible msg: {'key1': 1, 'key2': 2}
# Another possible msg: {'key1': 1, 'key2': 2, 'key3': 3}
return msg
class msg_to_tuple_list(beam.DoFn):
def process(self, msg):
return [(k,v) for k,v in msg.items()]
# Run function
def run(argv=None, save_main_session=True):
pipeline_options = PipelineOptions(save_main_session=True, streaming=True)
with beam.Pipeline(options=pipeline_options) as p:
input_subscription=MY_INPUT_SUBSCRIPTION
output_table=MY_OUTPUT_TABLE
_ = (p
| 'Trigger from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=input_subscription).with_output_types(bytes)
| 'Random generator' >> beam.Map(create_random_record)
| 'Convert dict to tuple list' >> beam.ParDo(msg_to_tuple_list())
| 'Window' >> beam.WindowInto(window.FixedWindows(5))
| 'Aggregation' >> beam.CombinePerKey(sum) # The function I'd like to accept any json..
)
# Run
run()
输出
('key1', 9)
('key2', 14)
('key3', 15)
('key4', 8)
我已经为此工作了一段时间,请记住,您需要添加一些调整并在您这边进行测试。您需要注意的事项:
为简单起见,我的示例中没有找到密钥price
的逻辑,但我设计时已经考虑到了。您需要添加它。
因为我们要 accumulating elements 和 GlobalWindow
,我有点担心你可能会 运行 过一段时间后出现内存问题。理论上,组合器提升应该使DF只存储累加器而不是所有元素。我有一个采用类似方法的管道 2 天没有问题,但在 Java(应该相同)。
需要一些额外的逻辑来匹配您的确切用例,这可以作为一个想法。
我正在使用高级组合器来 (1) 强制组合器提升和 (2) 因此您可以添加您的密钥解析逻辑。我使用了 5 分钟的触发器,因此每个键的总和将每 5 分钟更新一次(如果需要,您可以更改触发器)。
我将开始我的代码,假设您已经按照我们在评论中讨论的那样解析了流中的元素。输入我的代码的元素采用这种格式:
{"product": "apple", "price": {"currency": "JPY", "amount": 50}},
{"product": "orange", "price": {"amount": 50}},
{"product": "apple", "price": {"amount": 10}},
{"product": "orange", "price": {"currency": "EUR", "amount": 50}},
{"product": "apple", "price": {"currency": "JPY", "amount": 30}}
那些被传递到管道:
class NestedDictSum(beam.CombineFn):
def create_accumulator(self):
# accumulator instance starts at 0
return 0
def add_input(self, sum_value, input):
# Called for every new element, add it to the accumulator
return sum_value + self._get_price(input)
def merge_accumulators(self, accumulators):
# Called for every accumulator across workers / bundles
return sum(accumulators)
def extract_output(self, total):
# output value from merged accumulators
return {"sales": total, "currency": "JPY"}
def _get_price(self, dictionary):
# Add your logic to find the right key in it
# Needs to return the parsed price (what you want to sum)
return dictionary["price"]["amount"]
def add_product(element):
dictionary = element[1]
dictionary["product"] = element[0]
return dictionary
# Pipeline read stream and so on
| Map(lambda x: (x["product"], x)) # To KV
| WindowInto(GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5 * 60)),
# This makes the elements not be discarded, so the value would
# be updated as new elements are triggered
accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
| CombinePerKey(NestedDictSum())
| Map(add_product) # From KV to Dictionary adding Key
这个输出是:
{'sales': 90, 'currency': 'JPY', 'product': 'apple'}
{'sales': 100, 'currency': 'JPY', 'product': 'orange'}
请注意,在实际的 Streaming 案例中,此值将每 5 分钟更新一次
另外,我认为这个特殊的用例可能会受益于使用 Stateful and Timely DoFN。这将使您可以更精细地控制元素,并且可能比我发布的更适合。
我想摄取具有不断变化的模式(先验未知)的对象流(例如 JSON),并应用先验已知的自定义聚合。
在 Beam 中可以吗?
具体可以:
提取(嵌套)JSON 具有不断变化的架构的对象列表(在 PCollection 中):
msg1 = {"product":"apple","price":{"currency":"JPY","amount":50}}
msg2 = {"product":"apple","price":{"amount":70},"unuseful_field_for_this":"foo"}
在(全局和更新)时间 window(在 CombineFn 中)应用自定义聚合:
res = {"product":"apple","sales":120,"currency":"JPY"} <= Using JPY as default
下面的代码展示了通过使用元组(使用 CombinePerKey
提议的 here 函数)实现模式不可知的首次尝试,但并未展示“更丰富”的用例,因为以上一个。
注意:转换是由单独发送到 PubSub 的虚拟消息触发的。
代码
# Libraries
import random
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Generator function
def create_random_record(line):
n_fields = random.randint(1,5)
msg = {"key{}".format(i): i for i in range(1,n_fields)}
# One possible msg: {'key1': 1, 'key2': 2}
# Another possible msg: {'key1': 1, 'key2': 2, 'key3': 3}
return msg
class msg_to_tuple_list(beam.DoFn):
def process(self, msg):
return [(k,v) for k,v in msg.items()]
# Run function
def run(argv=None, save_main_session=True):
pipeline_options = PipelineOptions(save_main_session=True, streaming=True)
with beam.Pipeline(options=pipeline_options) as p:
input_subscription=MY_INPUT_SUBSCRIPTION
output_table=MY_OUTPUT_TABLE
_ = (p
| 'Trigger from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=input_subscription).with_output_types(bytes)
| 'Random generator' >> beam.Map(create_random_record)
| 'Convert dict to tuple list' >> beam.ParDo(msg_to_tuple_list())
| 'Window' >> beam.WindowInto(window.FixedWindows(5))
| 'Aggregation' >> beam.CombinePerKey(sum) # The function I'd like to accept any json..
)
# Run
run()
输出
('key1', 9)
('key2', 14)
('key3', 15)
('key4', 8)
我已经为此工作了一段时间,请记住,您需要添加一些调整并在您这边进行测试。您需要注意的事项:
为简单起见,我的示例中没有找到密钥
price
的逻辑,但我设计时已经考虑到了。您需要添加它。因为我们要 accumulating elements 和
GlobalWindow
,我有点担心你可能会 运行 过一段时间后出现内存问题。理论上,组合器提升应该使DF只存储累加器而不是所有元素。我有一个采用类似方法的管道 2 天没有问题,但在 Java(应该相同)。需要一些额外的逻辑来匹配您的确切用例,这可以作为一个想法。
我正在使用高级组合器来 (1) 强制组合器提升和 (2) 因此您可以添加您的密钥解析逻辑。我使用了 5 分钟的触发器,因此每个键的总和将每 5 分钟更新一次(如果需要,您可以更改触发器)。
我将开始我的代码,假设您已经按照我们在评论中讨论的那样解析了流中的元素。输入我的代码的元素采用这种格式:
{"product": "apple", "price": {"currency": "JPY", "amount": 50}},
{"product": "orange", "price": {"amount": 50}},
{"product": "apple", "price": {"amount": 10}},
{"product": "orange", "price": {"currency": "EUR", "amount": 50}},
{"product": "apple", "price": {"currency": "JPY", "amount": 30}}
那些被传递到管道:
class NestedDictSum(beam.CombineFn):
def create_accumulator(self):
# accumulator instance starts at 0
return 0
def add_input(self, sum_value, input):
# Called for every new element, add it to the accumulator
return sum_value + self._get_price(input)
def merge_accumulators(self, accumulators):
# Called for every accumulator across workers / bundles
return sum(accumulators)
def extract_output(self, total):
# output value from merged accumulators
return {"sales": total, "currency": "JPY"}
def _get_price(self, dictionary):
# Add your logic to find the right key in it
# Needs to return the parsed price (what you want to sum)
return dictionary["price"]["amount"]
def add_product(element):
dictionary = element[1]
dictionary["product"] = element[0]
return dictionary
# Pipeline read stream and so on
| Map(lambda x: (x["product"], x)) # To KV
| WindowInto(GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5 * 60)),
# This makes the elements not be discarded, so the value would
# be updated as new elements are triggered
accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
| CombinePerKey(NestedDictSum())
| Map(add_product) # From KV to Dictionary adding Key
这个输出是:
{'sales': 90, 'currency': 'JPY', 'product': 'apple'}
{'sales': 100, 'currency': 'JPY', 'product': 'orange'}
请注意,在实际的 Streaming 案例中,此值将每 5 分钟更新一次
另外,我认为这个特殊的用例可能会受益于使用 Stateful and Timely DoFN。这将使您可以更精细地控制元素,并且可能比我发布的更适合。