Apache Beam Streaming 滞后运算符
Apache Beam Streaming Lag Operator
我目前正在考虑构建像 SQL 中那样具有 LAG
运算符的管道。但是我不确定这是否可能。
为了更清楚,假设我有这样的数据流:
# sensor_name, temperature
("station 1", 30.0)
("station 1", 31.0)
("station 1", 32.0)
("station 1", 33.0)
("station 2", 30.0)
("station 2", 31.0)
("station 2", 32.0)
并执行 PTransform,输出变为
("station 1", {"now":30.0, "before":None})
("station 1", {"now":31.0, "before":30.0})
("station 1", {"now":32.0, "before":31.0})
("station 1", {"now":33.0, "before":32.0})
("station 2", {"now":30.0, "before":None})
("station 2", {"now":31.0, "before":30.0})
("station 2", {"now":32.0, "before":31.0})
可以吗?谢谢!
这里有一个使用 public 出租车主题的工作示例
这是 StatefulDoFn
class UpdateLast(beam.DoFn):
RIDE_TRACK = BagStateSpec('rides', TupleCoder((FloatCoder(), FloatCoder())))
def process(self,
element,
timestamp_param=beam.DoFn.TimestampParam,
ride_state=beam.DoFn.StateParam(RIDE_TRACK)):
key = element[0]
meter_reading = element[1]
timestamp = float(timestamp_param)
bag_content = [x for x in ride_state.read()]
if not bag_content:
logging.info("Generating entry %s for key %s", (meter_reading, timestamp), key)
ride_state.add((meter_reading, timestamp))
output = {"now": meter_reading, "before": None}
yield (key, output)
else:
# There should only be one element in the bag
bag_ride = bag_content[0]
old_meter = bag_ride[0]
old_timestamp = bag_ride[1]
# We only need to check if the element is more recent
if timestamp > old_timestamp:
# Update bag
ride_state.clear()
ride_state.add((meter_reading, timestamp))
output = {"now": meter_reading, "before": old_meter}
logging.info("KEY %s: updating from %s to %s", key, old_meter, meter_reading)
yield (key, output)
else:
# Invert old and new if element is old
output = {"now": old_meter, "before": meter_reading}
yield (key, output)
还有供您测试的管道
options = PipelineOptions(
temp_location=f"{bucket}/tmp/",
project=project,
region=region,
streaming=True,
job_name="statedofn",
num_workers=4,
max_num_workers=20,
)
p = beam.Pipeline(DataflowRunner(), options=options)
topic = "projects/pubsub-public-data/topics/taxirides-realtime"
pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
| "Json Loads" >> Map(json.loads)
| beam.Filter(lambda x: x["ride_status"] == "enroute")
| "KV" >> Map(lambda x: (x["ride_id"], x["meter_reading"]))
)
state_df = (pubsub | "Stateful Do Fn" >> ParDo(UpdateLast())
| Map(logging.info)
)
p.run()
输出:
('052b8a40-1c57-4a3c-a012-73ffeddb1f02', {'now': 9.875244, 'before': 9.857124})
('835a9a99-c2fc-4f3d-9284-59098827fe05', {'now': 26.973698, 'before': 26.940273})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.828278, 'before': 17.808857})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.847698, 'before': 17.828278})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3575556, 'before': 2.3346667})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3804445, 'before': 2.3575556})
我目前正在考虑构建像 SQL 中那样具有 LAG
运算符的管道。但是我不确定这是否可能。
为了更清楚,假设我有这样的数据流:
# sensor_name, temperature
("station 1", 30.0)
("station 1", 31.0)
("station 1", 32.0)
("station 1", 33.0)
("station 2", 30.0)
("station 2", 31.0)
("station 2", 32.0)
并执行 PTransform,输出变为
("station 1", {"now":30.0, "before":None})
("station 1", {"now":31.0, "before":30.0})
("station 1", {"now":32.0, "before":31.0})
("station 1", {"now":33.0, "before":32.0})
("station 2", {"now":30.0, "before":None})
("station 2", {"now":31.0, "before":30.0})
("station 2", {"now":32.0, "before":31.0})
可以吗?谢谢!
这里有一个使用 public 出租车主题的工作示例
这是 StatefulDoFn
class UpdateLast(beam.DoFn):
RIDE_TRACK = BagStateSpec('rides', TupleCoder((FloatCoder(), FloatCoder())))
def process(self,
element,
timestamp_param=beam.DoFn.TimestampParam,
ride_state=beam.DoFn.StateParam(RIDE_TRACK)):
key = element[0]
meter_reading = element[1]
timestamp = float(timestamp_param)
bag_content = [x for x in ride_state.read()]
if not bag_content:
logging.info("Generating entry %s for key %s", (meter_reading, timestamp), key)
ride_state.add((meter_reading, timestamp))
output = {"now": meter_reading, "before": None}
yield (key, output)
else:
# There should only be one element in the bag
bag_ride = bag_content[0]
old_meter = bag_ride[0]
old_timestamp = bag_ride[1]
# We only need to check if the element is more recent
if timestamp > old_timestamp:
# Update bag
ride_state.clear()
ride_state.add((meter_reading, timestamp))
output = {"now": meter_reading, "before": old_meter}
logging.info("KEY %s: updating from %s to %s", key, old_meter, meter_reading)
yield (key, output)
else:
# Invert old and new if element is old
output = {"now": old_meter, "before": meter_reading}
yield (key, output)
还有供您测试的管道
options = PipelineOptions(
temp_location=f"{bucket}/tmp/",
project=project,
region=region,
streaming=True,
job_name="statedofn",
num_workers=4,
max_num_workers=20,
)
p = beam.Pipeline(DataflowRunner(), options=options)
topic = "projects/pubsub-public-data/topics/taxirides-realtime"
pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
| "Json Loads" >> Map(json.loads)
| beam.Filter(lambda x: x["ride_status"] == "enroute")
| "KV" >> Map(lambda x: (x["ride_id"], x["meter_reading"]))
)
state_df = (pubsub | "Stateful Do Fn" >> ParDo(UpdateLast())
| Map(logging.info)
)
p.run()
输出:
('052b8a40-1c57-4a3c-a012-73ffeddb1f02', {'now': 9.875244, 'before': 9.857124})
('835a9a99-c2fc-4f3d-9284-59098827fe05', {'now': 26.973698, 'before': 26.940273})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.828278, 'before': 17.808857})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.847698, 'before': 17.828278})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3575556, 'before': 2.3346667})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3804445, 'before': 2.3575556})