Beam - 模式不可知 ingestion/aggregation 可能吗?

Beam - Is schema-agnostic ingestion/aggregation possible?

我想摄取具有不断变化的模式(先验未知)的对象流(例如 JSON),并应用先验已知的自定义聚合。

在 Beam 中可以吗?

具体可以:

  1. 提取(嵌套)JSON 具有不断变化的架构的对象列表(在 PCollection 中):
    msg1 = {"product":"apple","price":{"currency":"JPY","amount":50}}
    msg2 = {"product":"apple","price":{"amount":70},"unuseful_field_for_this":"foo"}

  2. 在(全局和更新)时间 window(在 CombineFn 中)应用自定义聚合:
    res = {"product":"apple","sales":120,"currency":"JPY"} <= Using JPY as default

下面的代码展示了通过使用元组(使用 CombinePerKey 提议的 here 函数)实现模式不可知的首次尝试,但并未展示“更丰富”的用例,因为以上一个。

注意:转换是由单独发送到 PubSub 的虚拟消息触发的。

代码

# Libraries

import random
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Generator function

def create_random_record(line):
 
    n_fields = random.randint(1,5)
    msg = {"key{}".format(i): i for i in range(1,n_fields)}
   
    # One possible msg: {'key1': 1, 'key2': 2}
    # Another possible msg: {'key1': 1, 'key2': 2, 'key3': 3}

    return msg

class msg_to_tuple_list(beam.DoFn):

    def process(self, msg):
        return [(k,v) for k,v in msg.items()]
  
# Run function

def run(argv=None, save_main_session=True):

  pipeline_options = PipelineOptions(save_main_session=True, streaming=True)
  
  with beam.Pipeline(options=pipeline_options) as p:
      
    input_subscription=MY_INPUT_SUBSCRIPTION
    output_table=MY_OUTPUT_TABLE
    
    _ = (p
        | 'Trigger from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=input_subscription).with_output_types(bytes)
        | 'Random generator' >> beam.Map(create_random_record)
        | 'Convert dict to tuple list' >> beam.ParDo(msg_to_tuple_list())
        | 'Window' >> beam.WindowInto(window.FixedWindows(5))
        | 'Aggregation' >> beam.CombinePerKey(sum) # The function I'd like to accept any json..      
        )

# Run
run()

输出

('key1', 9)
('key2', 14)
('key3', 15)
('key4', 8)

我已经为此工作了一段时间,请记住,您需要添加一些调整并在您这边进行测试。您需要注意的事项:

  • 为简单起见,我的示例中没有找到密钥price的逻辑,但我设计时已经考虑到了。您需要添加它。

  • 因为我们要 accumulating elementsGlobalWindow,我有点担心你可能会 运行 过一段时间后出现内存问题。理论上,组合器提升应该使DF只存储累加器而不是所有元素。我有一个采用类似方法的管道 2 天没有问题,但在 Java(应该相同)。

  • 需要一些额外的逻辑来匹配您的确切用例,这可以作为一个想法。

我正在使用高级组合器来 (1) 强制组合器提升和 (2) 因此您可以添加您的密钥解析逻辑。我使用了 5 分钟的触发器,因此每个键的总和将每 5 分钟更新一次(如果需要,您可以更改触发器)。

我将开始我的代码,假设您已经按照我们在评论中讨论的那样解析了流中的元素。输入我的代码的元素采用这种格式:

        {"product": "apple", "price": {"currency": "JPY", "amount": 50}},
        {"product": "orange", "price": {"amount": 50}},
        {"product": "apple", "price": {"amount": 10}},
        {"product": "orange", "price": {"currency": "EUR", "amount": 50}},
        {"product": "apple", "price": {"currency": "JPY", "amount": 30}}

那些被传递到管道:

    class NestedDictSum(beam.CombineFn):
        def create_accumulator(self):
            # accumulator instance starts at 0
            return 0

        def add_input(self, sum_value, input):
            # Called for every new element, add it to the accumulator
            return sum_value + self._get_price(input)

        def merge_accumulators(self, accumulators):
            # Called for every accumulator across workers / bundles
            return sum(accumulators)

        def extract_output(self, total):
            # output value from merged accumulators
            return {"sales": total, "currency": "JPY"}

        def _get_price(self, dictionary):
            # Add your logic to find the right key in it
            # Needs to return the parsed price (what you want to sum)
            return dictionary["price"]["amount"]

    def add_product(element):
        dictionary = element[1]
        dictionary["product"] = element[0]
        return dictionary

# Pipeline read stream and so on

     | Map(lambda x: (x["product"], x))  # To KV
     | WindowInto(GlobalWindows(),
                  trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5 * 60)),
                  # This makes the elements not be discarded, so the value would
                  # be updated as new elements are triggered
                  accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
     | CombinePerKey(NestedDictSum())
     | Map(add_product)  # From KV to Dictionary adding Key

这个输出是:

{'sales': 90, 'currency': 'JPY', 'product': 'apple'}
{'sales': 100, 'currency': 'JPY', 'product': 'orange'}

请注意,在实际的 Streaming 案例中,此值将每 5 分钟更新一次

另外,我认为这个特殊的用例可能会受益于使用 Stateful and Timely DoFN。这将使您可以更精细地控制元素,并且可能比我发布的更适合。