如何获得元组中超过 2 个项目的 PCollection 的最大值?

How can I get the max value of a PCollection that has more than 2 items in a tuple?

我正在尝试使用 apache beam 获得两个字符串匹配之间的最高分数。

with beam.Pipeline(options = options) as pipeline:
    result = (pipeline
                   | 'read_data' >> beam.io.ReadFromBigQuery(query = query)
                   | 'printing' >> beam.ParDo(print)
                   )

输出具有以下结构:

(('address 3642', ['270-42']), ('av. lonely street 7460', [13541]), -3020.3391812865493)
(('address 3642', ['270-42']), ('bacar 8059', [2486]), -3653.532507739938)
(('address 3642', ['270-42']), ('pdt sta 4383', [2648]), 35.382428940568616)
(('address 3642', ['270-42']), ('holy mary 6998', [239]), -2557.4241486068113)
(('new address 3328', ['266-25']), ('false street 7451', [16274]), -3332.230769230769)
(('new address 3328', ['266-25']), ('principal  7532', [1726]), -3344.12474012474)
(('new address 3328', ['266-25']), ('principal  6931', [97]), -2780.3804573804573)
(('new address 3328', ['266-25']), ('john dewey 7577', [2976]), -3458.230769230769)
(('new address 3328', ['266-25']), ('n address 3159', [7852]), 717.2029405063462)
(('address 3642', ['270-42']), ('false street 7451', [1463]), -3012.646370829033)
(('address 3642', ['270-42']), ('sft avenue 7532', [36148]), -3050.6295149638804)
(('address 3642', ['270-42']), ('principal 6931', [3841]), -2489.1169590643276)
(('address 3642', ['270-42']), ('principal 7577', [36841]), -3171.532507739938)
(('address 3642', ['270-42']), ('john dewey 3159', [6418]), 376.5702654526182)

因此每个元组的最后一项 (ej: -3020.339) 是第一个元组中的第一个字符串(地址 3642)与第二个元组中的第一个字符串(av. lonely street 7460)之间的匹配分数。

我想获得以第一个元组的第一个字符串(ej:地址 3642 和新地址 3328)为键的最大匹配分数,但又不丢失元组内的其他数据。例如,在这种情况下我想要的输出是:

(('address 3642', ['270-42']), ('pdt sta 4383', [2648]), 35.382428940568616)
(('new address 3328', ['266-25']), ('n address 3159', [7852]), 717.2029405063462)

我尝试了 beam.CombinePerKey(max)beam.CombinePerKey(max).with_input_types(Tuple[SomeClass,float]),但没有得到想要的结果。我怎样才能做到这一点?

你可以试试 GroupByKey - https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/

import apache_beam as beam


def convertToKV(x):
    return x[0][0], [x[0][1], x[1], x[2]]

def findMax(x):
    key = x[0]
    max_value = x[1][0]
    for v in x[1]:
        if v[2] > max_value[2]:
            max_value = v
    return ((key, max_value[0]), max_value[1], max_value[2])


with beam.Pipeline() as pipeline:
    findMaxPerKey = (
            pipeline
    | 'Create pcollection' >> beam.Create([
        (('address 3642', ['270-42']), ('av. lonely street 7460', [13541]), -3020.3391812865493),
        (('address 3642', ['270-42']), ('bacar 8059', [2486]), -3653.532507739938),
        (('address 3642', ['270-42']), ('pdt sta 4383', [2648]), 35.382428940568616),
        (('address 3642', ['270-42']), ('holy mary 6998', [239]), -2557.4241486068113),
        (('new address 3328', ['266-25']), ('false street 7451', [16274]), -3332.230769230769),
        (('new address 3328', ['266-25']), ('principal  7532', [1726]), -3344.12474012474),
        (('new address 3328', ['266-25']), ('principal  6931', [97]), -2780.3804573804573),
        (('new address 3328', ['266-25']), ('john dewey 7577', [2976]), -3458.230769230769),
        (('new address 3328', ['266-25']), ('n address 3159', [7852]), 717.2029405063462),
        (('address 3642', ['270-42']), ('false street 7451', [1463]), -3012.646370829033),
        (('address 3642', ['270-42']), ('sft avenue 7532', [36148]), -3050.6295149638804),
        (('address 3642', ['270-42']), ('principal 6931', [3841]), -2489.1169590643276),
        (('address 3642', ['270-42']), ('principal 7577', [36841]), -3171.532507739938),
        (('address 3642', ['270-42']), ('john dewey 3159', [6418]), 376.5702654526182),
    ])   |  beam.Map(lambda x: convertToKV(x))
            |  beam.GroupByKey()
      | beam.Map(lambda x : findMax(x))) | beam.Map(print)

我可以看到三种方式来考虑您想要什么:作为自定义比较器或自定义光束组合器。

我认为最简单的方法是为元组编写一个比较器函数并将其传递给 CombinePerKey。您编写一个可调用对象,将可迭代的输入元组映射到得分最高的元组。有关如何编写可传递给 CombinePerKey 的可调用文件的完整文档位于 https://beam.apache.org/documentation/programming-guide/#simple-combines.

另一个可以使您的代码在未来更具可读性的选项是将数据包装在命名的 class 中并使其具有可比性,如 Comparable classes in Python 3 中所述(以及许多其他地方) ) 然后你 CombinePerKey(max).

最后,如果您想了解有关 Beam 的更多信息,可以实现 CombineFn 接口,如 https://beam.apache.org/documentation/programming-guide/#advanced-combines and with many examples in https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/combiners.py 所述。

Kenneth 和 Swar Patel 所说的都行得通。我没有考虑可比 类,这是个好主意。如果您有许多具有相同键的元素, GBK 可能会有点低效,因为您需要将它们分组在同一个工作人员中然后处理它们,这就是我建议使用自定义组合器的原因。组合器被“提升”,以便您可以并行工作并且只跟踪累加器。

这是您想要使用自定义组合器的代码:

CombinerFn

class CustomMax(CombineFn):
  def create_accumulator(self):
    # create and initialise accumulator
    tuple_track = None
    max_value = float('-inf')
    return tuple_track, max_value

  def add_input(self, accumulator, element):
    # accumulates each element from input in accumaltor
    if element[2] > accumulator[1]:
        return element, element[2]
    return accumulator

  def merge_accumulators(self, accumulators):
    # Multiple accumulators could be processed in parallel,
    # this function merges them
    return max(accumulators, key=lambda x: x[1])

  def extract_output(self, accumulator):
    # Only output the tracker
    return accumulator[0]

供您使用数据进行测试的管道

elements = [
    (('address 3642', ['270-42']), ('av. lonely street 7460', [13541]), -3020.3391812865493),
        (('address 3642', ['270-42']), ('bacar 8059', [2486]), -3653.532507739938),
        (('address 3642', ['270-42']), ('pdt sta 4383', [2648]), 35.382428940568616),
        (('address 3642', ['270-42']), ('holy mary 6998', [239]), -2557.4241486068113),
        (('new address 3328', ['266-25']), ('false street 7451', [16274]), -3332.230769230769),
        (('new address 3328', ['266-25']), ('principal  7532', [1726]), -3344.12474012474),
        (('new address 3328', ['266-25']), ('principal  6931', [97]), -2780.3804573804573),
        (('new address 3328', ['266-25']), ('john dewey 7577', [2976]), -3458.230769230769),
        (('new address 3328', ['266-25']), ('n address 3159', [7852]), 717.2029405063462),
        (('address 3642', ['270-42']), ('false street 7451', [1463]), -3012.646370829033),
        (('address 3642', ['270-42']), ('sft avenue 7532', [36148]), -3050.6295149638804),
        (('address 3642', ['270-42']), ('principal 6931', [3841]), -2489.1169590643276),
        (('address 3642', ['270-42']), ('principal 7577', [36841]), -3171.532507739938),
        (('address 3642', ['270-42']), ('john dewey 3159', [6418]), 376.5702654526182),
]

(p | Create(elements)
   | "Add key" >> Map(lambda x: (x[0][0], x))
   | CombinePerKey(CustomMax())
   | "Remove key" >> Map(lambda x: x[1])
   | "log" >> Map(print))

输出

(('address 3642', ['270-42']), ('john dewey 3159', [6418]), 376.5702654526182)
(('new address 3328', ['266-25']), ('n address 3159', [7852]), 717.2029405063462)