Apache Beam Streaming 滞后运算符

Apache Beam Streaming Lag Operator

我目前正在考虑构建像 SQL 中那样具有 LAG 运算符的管道。但是我不确定这是否可能。

为了更清楚,假设我有这样的数据流:

# sensor_name, temperature
("station 1", 30.0)
("station 1", 31.0)
("station 1", 32.0)
("station 1", 33.0)
("station 2", 30.0)
("station 2", 31.0)
("station 2", 32.0)

并执行 PTransform,输出变为

("station 1", {"now":30.0, "before":None})
("station 1", {"now":31.0, "before":30.0})
("station 1", {"now":32.0, "before":31.0})
("station 1", {"now":33.0, "before":32.0})
("station 2", {"now":30.0, "before":None})
("station 2", {"now":31.0, "before":30.0})
("station 2", {"now":32.0, "before":31.0})

可以吗?谢谢!

这里有一个使用 public 出租车主题的工作示例

这是 StatefulDoFn

class UpdateLast(beam.DoFn):
    RIDE_TRACK = BagStateSpec('rides', TupleCoder((FloatCoder(), FloatCoder())))
    
    def process(self,
                element,
                timestamp_param=beam.DoFn.TimestampParam,
                ride_state=beam.DoFn.StateParam(RIDE_TRACK)):
        
        key = element[0]
        meter_reading = element[1]
        timestamp = float(timestamp_param)
        
        bag_content = [x for x in ride_state.read()]
        if not bag_content:
            logging.info("Generating entry %s for key %s", (meter_reading, timestamp), key)
            ride_state.add((meter_reading, timestamp))
            output = {"now": meter_reading, "before": None}
            yield (key, output)
        else:
            # There should only be one element in the bag
            bag_ride = bag_content[0]
            
            old_meter = bag_ride[0]
            old_timestamp = bag_ride[1]
            # We only need to check if the element is more recent
            if timestamp > old_timestamp:
                # Update bag
                ride_state.clear()
                ride_state.add((meter_reading, timestamp))
                output = {"now": meter_reading, "before": old_meter}
                logging.info("KEY %s: updating from %s to %s", key, old_meter, meter_reading)
                yield (key, output)
            else: 
                # Invert old and new if element is old
                output = {"now": old_meter, "before": meter_reading}
                yield (key, output) 

还有供您测试的管道

options = PipelineOptions(
    temp_location=f"{bucket}/tmp/",
    project=project,
    region=region,
    streaming=True,
    job_name="statedofn",
    num_workers=4,
    max_num_workers=20,
)

p = beam.Pipeline(DataflowRunner(), options=options)

topic = "projects/pubsub-public-data/topics/taxirides-realtime"

pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
            | "Json Loads" >> Map(json.loads)
            | beam.Filter(lambda x: x["ride_status"] == "enroute")
            | "KV" >> Map(lambda x: (x["ride_id"], x["meter_reading"]))
         )

state_df = (pubsub | "Stateful Do Fn" >> ParDo(UpdateLast())
                   | Map(logging.info)
           )

p.run()

输出:

('052b8a40-1c57-4a3c-a012-73ffeddb1f02', {'now': 9.875244, 'before': 9.857124})
('835a9a99-c2fc-4f3d-9284-59098827fe05', {'now': 26.973698, 'before': 26.940273})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.828278, 'before': 17.808857})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.847698, 'before': 17.828278})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3575556, 'before': 2.3346667})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3804445, 'before': 2.3575556})