CoGroupByKey 没有给出预期的结果 Apache Beam(python)

CoGroupByKey not giving desired results Apache Beam(python)

我一直在测试将 pub/sub 读取数据与自己创建的数据连接起来。下面是主要的流水线方法。

def run(input_topic,input_subscription, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    
    pipeline_options = PipelineOptions(pipeline_args, streaming=True, save_main_session=True)
    with Pipeline(options=pipeline_options) as pipeline:
        # reading from pub/sub and creating a fixed window of 1 min.
        p1 =  pipeline | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=input_subscription)\
        | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
        #creating sample data 
        p2 = pipeline | "creating a sample data" >> Create([('Hello','sh 1'),('Hello','sh 1.1'),
        ('Hello_world','sh 2'),
        ('Hello_everyone','sh 3'),
        ('Hello_cloud','sh 4')])
    
        ({"schdedule":p2,"timestamp":p1}) | "merging" >> CoGroupByKey()| "merge print">> Map(print)

下面是window和添加时间戳转换方法。

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 30 seconds.
        self.window_size = int(window_size * 30)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
                                    
        )


class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam, window=DoFn.WindowParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (element.decode("utf-8"),datetime.utcfromtimestamp(float(publish_time)).strftime("%Y-%m-%d %H:%M:%S"))

我得到的结果如下所示。

('Hello', {'schdedule': [], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_world', {'schdedule': [], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_everyone', {'schdedule': [], 'timestamp': ['2021-07-16 13:19:00']})

时间表 列表打印为空,因为它没有加入。

预期是

('Hello', {'schdedule': ['sh 1','sh 1.1'], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_world', {'schdedule': ['sh 2'], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_everyone', {'schdedule': ['sh 3'], 'timestamp': ['2021-07-16 13:19:00']})

我尝试在 p2 上单独执行 GroupByKey,它工作正常并给了我以下结果。

('Hello', ['sh 1','sh 1.1'])
('Hello_world', ['sh 2'])
('Hello_everyone', ['sh 3'])

还尝试了带有侧输入的静态字典并且它工作正常但是一旦我做了 CoGroupByKey 它就不会从 p2 管道产生任何结果。如果我在这里做错了什么,请提出建议。

为了进一步为社区做贡献,我发布了这个答案。

我假设您的第二个 PCollection p2 是固定且不可变的。因此,对于来自 p1 的每条新记录,它将与来自 p2 的适当键合并。换句话说,每当一条记录以 Hello 作为主键时, schdedule': ['sh 1','sh 1.1'] 将被添加到最终输出中。

如评论中所述,方法 CoGroupByKey 在没有 windowing 函数的情况下工作。如下例所示,

import apache_beam as beam
from apache_beam import Create, Map, ParDo, Flatten
from apache_beam import CoGroupByKey
from apache_beam import pvalue, window, WindowInto

with beam.Pipeline() as pipeline:
    
    timestamps= [('Hello','2021-07-16 13:19:00'),('Hello_world','2021-07-16 13:19:00'),('Hello_everyone','2021-07-16 13:19:00'),
                 ('Hello_cloud','2021-07-16 13:19:00')]
    p1 = pipeline | "Timestamps" >> Create(timestamps)
    
        #creating sample data 
    p2 = pipeline | "creating a sample data" >> Create([('Hello','sh 1'),('Hello','sh 1.1'),
    ('Hello_world','sh 2'),
    ('Hello_everyone','sh 3'),
    ('Hello_cloud','sh 4')])
    
    ({"schdedule":p2,"timestamp":p1}) | "merging" >> CoGroupByKey() | "merge print">> Map(print)
 

但是,当 windowing 第一个 PCollection p1 时,不会合并第二个 PCollection。发生这种情况是因为第二个 PCollection 不是 windowed,元素没有时间戳,以便与 p1[=49= 放在相同的 window(或不同)中].根据 documentation,

Side inputs and windowing

Beam uses the window(s) for the main input element to look up the appropriate window for the side input element. Beam projects the main input element’s window into the side input’s window set, and then uses the side input from the resulting window. If the main input and side inputs have identical windows, the projection provides the exact corresponding window. However, if the inputs have different windows, Beam uses the projection to choose the most appropriate side input window.

在您的情况下,p2p1 不在同一个 window 中,因为它没有时间戳.所以它不存在于输出中。但是,有一个解决方法。考虑到 p2 是不可变的,如前所述,我们可以:

  1. 首先将 p1 的时间戳转换为 UNIX
  2. 合并 p2p1
  3. Window 基于 p1 时间戳的输出

为简单起见,此代码使用批处理模型的简化版本是,

import apache_beam as beam
from apache_beam import Create, Map, 
from apache_beam import  CoGroupByKey
from apache_beam import pvalue, window, WindowInto

with beam.Pipeline() as pipeline:
    
    timestamps= [('Hello','2021-07-16 13:19:00'), ('Hello','2021-07-16 13:19:05'),('Hello_world','2021-07-16 13:19:00'),('Hello_everyone','2021-07-16 13:19:00'),
                 ('Hello_cloud','2021-07-16 13:19:00')]
    p1 = pipeline | "Timestamps" >> Create(timestamps) | "Add timestamps" >> Map(lambda x: window.TimestampedValue(x, date2unix(x[1])))
    
        #creating sample data 
    p2 = (pipeline | "creating a sample data" >> Create([('Hello','sh 1'),('Hello','sh 1.1'),
    ('Hello_world','sh 2'),
    ('Hello_everyone','sh 3'),
    ('Hello_cloud','sh 4')]))
    
    (({"schdedule":p2,"timestamp":p1}) | "merging" >> CoGroupByKey() 
                                       | "FixedWindow2" >> WindowInto(window.FixedWindows(60)) #60 seconds windows
                                       | "merge print">> Map(print))

和输出,

('Hello', {'schdedule': ['sh 1', 'sh 1.1'], 'timestamp': ['2021-07-16 13:19:00', '2021-07-16 13:19:05']})
('Hello_world', {'schdedule': ['sh 2'], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_everyone', {'schdedule': ['sh 3'], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_cloud', {'schdedule': ['sh 4'], 'timestamp': ['2021-07-16 13:19:00']})

请注意,对于密钥 Hello,在同一个 window 中有两个时间戳,这证实 windowing 被正确使用。

所以只是在这里做出贡献。 这个问题的真正目的是将来自维度 table 或静态数据存储的数据与流数据连接起来。 从问题中可以明显看出 CoGroupByKey 没有加入时间 windowed 和全局 windowed 数据。什么是 windowed 和全局 windowed 数据?

windowed :换句话说,应用了 windowed 的数据组。这反过来将时间边界应用于不断流式传输的数据。因此行数永远不会是无穷大。

global windowed : 没有时间戳边界。它可能是流式或批处理或维度 table 或静态数据存储。

所以我们在这里发生了冲突,因为我们将 windowed 数据与全局 windowed 数据组合在一起。

那么如何解决这个问题呢?

有不同的方法可以做到这一点。下面列出了其中的一些。

1. 使两个数据流相同window.

2. 使用侧输入。阅读 this. more info here

3.在Pardo变换中使用setup方法。

在我的例子中,我寻求不需要为静态数据生成 window,因此我使用解决方案 23[ 实现了这个=57=].

解决方案2

def run(input_topic,input_subscription, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )
#     pipeline = Pipeline(options=pipeline_options)
    with Pipeline(options=pipeline_options) as pipeline:
        p1 =  pipeline | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=input_subscription)\
                 | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)\
                 |"adding time stamp value ">> Map(lambda x : (x[0],datetime.utcfromtimestamp(float(x[1])).strftime("%Y-%m-%d %H:%M:%S")))\
                 |"p1 group by">>GroupByKey()

        p2 = pipeline |"generating data">> Create([('Hello','sh 1'),('Hello','sh 1.1'),
        ('Hello_world','sh 2'),
        ('Hello_everyone','sh 3'),
        ('Hello_cloud','sh 4')])\
         |"p2 group by">> GroupByKey()      
        p1|"perfomring join">> Map(join_data,beam.pvalue.AsDict(p2))| Map(print)

解决方案3

class join_data(DoFn):
    def setup(self):
        self.sample_data_dict = {'Hello':['sh 1','sh 1.1'],
    'Hello_world':'sh 2',
    'Hello_everyone':'sh 3',
    'Hello_cloud':'sh 4'}
        return
    def process(self,ele):
        yield ((ele[0],ele[1],self.sample_data_dict[ele[0]]))

def run(input_topic,input_subscription, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )
#     pipeline = Pipeline(options=pipeline_options)
    with Pipeline(options=pipeline_options) as pipeline:
        p1 =  pipeline | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=input_subscription)\
                 | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)\
|"adding time stamp value ">> Map(lambda x : (x[0],datetime.utcfromtimestamp(float(x[1])).strftime("%Y-%m-%d %H:%M:%S")))\
|"p1 group by">>GroupByKey()
        p1|"perfomring transformation">> ParDo(join_data())| Map(print)

在生产管道中,我们可能会遇到这个问题,通过在其中添加维度信息来转换流数据,我们可以很容易地利用 setupstart_bundle 创建 database/bigquery 连接。 请注意:每个 class instance/per worker 调用一次 setup 方法,每个 window 调用 start_bundle 方法或每组行documentation. more info on ParDo here.

在这两种情况下,我都能得到上述问题中提到的预期结果。