Apache Beam 中的窗口连接
Windowed Joins in Apache Beam
我对 Apache Beam 很陌生并实现了我的第一个管道。
但现在我到了一个地步,我对如何结合 windowing 和加入感到困惑。
问题定义:
我有两个数据流,一个是用户的页面浏览量,另一个是用户的请求。他们共享描述用户会话的密钥 session_id,但每个人都有其他附加数据。
目标是在请求发生之前计算会话中的综合浏览量。这意味着,我想要一个数据流,其中包含每个请求以及请求之前的综合浏览量。假设最后 5 分钟的浏览量就足够了。
我试过的
为了加载请求,我使用了这个代码片段,它从 pubsub 订阅加载请求,然后提取 session_id 作为键。最后,我应用了一个 window,它在收到请求时直接发出每个请求。
requests = (p
| 'Read Requests' >> (
beam.io.ReadFromPubSub(subscription=request_sub)
| 'Extract' >> beam.Map(lambda x: json.loads(x))
| 'Session as Key' >> beam.Map(lambda request: (request['session_id'], request))
| 'Window' >> beam.WindowInto(window.SlidingWindows(5 * 60, 1 * 60, 0),
trigger=trigger.AfterCount(1),
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
)
)
同样,此代码段加载综合浏览量,它应用滑动 window,每当综合浏览量进入时就会累积发出。
pageviews = (p
| 'Read Pageviews' >> (
beam.io.ReadFromPubSub(subscription=pageview_sub)
| 'Extract' >> beam.Map(lambda x: json.loads(x))
| 'Session as Key' >> beam.Map(lambda pageview: (pageview['session_id'], pageview))
| 'Window' >> beam.WindowInto(
windowfn=window.SlidingWindows(5 * 60, 1 * 60, 0),
trigger=trigger.AfterCount(1),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
)
)
为了应用连接,我尝试了
combined = (
{
'requests': requests,
'pageviews': pageviews
}
| 'Merge' >> beam.CoGroupByKey()
| 'Print' >> beam.Map(print)
)
当我运行这个管道时,合并的行中从来没有包含请求和综合浏览量的行,只有其中一个。
我的想法是在request之前过滤pageviews,在cogroupby之后统计。我需要做什么?我想我的问题是 windowing 和触发策略。
以低延迟处理请求也很重要,可能会丢弃迟到的综合浏览量。
我自己找到了一个解决方案,如果有人感兴趣,我会提供:
想法
诀窍是使用 beam.Flatten
操作合并两个流,并使用 Stateful DoFn 来计算一个请求之前的综合浏览量。每个流包含 json 个词典。我通过使用 {'request' : request}
和 {'pageview' : pageview}
作为周围块来嵌入它们,这样我就可以在 Stateful DoFn 中将不同的事件分开。我还计算了第一次网页浏览时间戳和自第一次网页浏览以来的秒数。流必须使用 session_id
作为键,这样 Stateful DoFn 只接收一个会话的所有事件。
代码
首先,这是管道代码:
# Beam pipeline, that extends requests by number of pageviews before request in that session
with beam.Pipeline(options=options) as p:
# The stream of requests
requests = (
'Read from PubSub subscription' >> beam.io.ReadFromPubSub(subscription=request_sub)
| 'Extract JSON' >> beam.ParDo(ExtractJSON())
| 'Add Timestamp' >> beam.ParDo(AssignTimestampFn())
| 'Use Session ID as stream key' >> beam.Map(lambda request: (request['session_id'], request))
| 'Add type of event' >> beam.Map(lambda r: (r[0], ('request', r[1])))
)
# The stream of pageviews
pageviews = (
'Read from PubSub subscription' >> beam.io.ReadFromPubSub(subscription=pageview_sub)
| 'Extract JSON' >> beam.ParDo(ExtractJSON())
| 'Add Timestamp' >> beam.ParDo(AssignTimestampFn())
| 'Use Session ID as stream key' >> beam.Map(lambda pageview: (pageview['session_id'], pageview))
| 'Add type of event' >> beam.Map(lambda p: (p[0], ('pageview', p[1])))
)
# Combine the streams and apply Stateful DoFn
combined = (
(
p | ('Prepare requests stream' >> requests),
p | ('Prepare pageviews stream' >> pageviews)
)
| 'Combine event streams' >> beam.Flatten()
| 'Global Window' >> beam.WindowInto(windowfn=window.GlobalWindows(),
trigger=trigger.AfterCount(1),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| 'Stateful DoFn' >> beam.ParDo(CountPageviews())
| 'Compute processing delay' >> beam.ParDo(LogTimeDelay())
| 'Format for BigQuery output' >> beam.ParDo(FormatForOutputDoFn())
)
# Write to BigQuery.
combined | 'Write' >> beam.io.WriteToBigQuery(
requests_extended_table,
schema=REQUESTS_EXTENDED_TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
有趣的部分是使用 beam.Flatten
和应用有状态 DoFn CountPageviews()
的两个流的组合
这是使用的自定义 DoFns 的代码:
# This DoFn just loads a json message
class ExtractJSON(beam.DoFn):
def process(self, element):
import json
yield json.loads(element)
# This DoFn adds the event timestamp of messages into it json elements for further processing
class AssignTimestampFn(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
import datetime
timestamped_element = element
timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
timestamp = timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
timestamped_element['timestamp_utc'] = timestamp_utc
timestamped_element['timestamp'] = timestamp
yield timestamped_element
# This class is a stateful dofn
# Input elements should be of form (session_id, {'event_type' : event}
# Where events can be requests or pageviews
# It computes on a per session basis the number of pageviews and the first pageview timestamp
# in its internal state
# Whenever a request comes in, it appends the internal state to the request and emits
# a extended request
# Whenever a pageview comes in, the internal state is updated but nothing is emitted
class CountPageviewsStateful(beam.DoFn):
# The internal states
NUM_PAGEVIEWS = userstate.CombiningValueStateSpec('num_pageviews', combine_fn=sum)
FIRST_PAGEVIEW = userstate.ReadModifyWriteStateSpec('first_pageview', coder=beam.coders.VarIntCoder())
def process(self,
element,
num_pageviews_state=beam.DoFn.StateParam(NUM_PAGEVIEWS),
first_pageview_state=beam.DoFn.StateParam(FIRST_PAGEVIEW)
):
import datetime
# Extract element
session_id = element[0]
event_type, event = element[1]
# Process different event types
# Depending on event type, different actions are done
if event_type == 'request':
# This is a request
request = event
# First, the first pageview timestamp is extracted and the seconds since first timestamp are calculated
first_pageview = first_pageview_state.read()
if first_pageview is not None:
seconds_since_first_pageview = (int(request['timestamp_utc'].timestamp()) - first_pageview)
first_pageview_timestamp_utc = datetime.datetime.utcfromtimestamp(float(first_pageview))
first_pageview_timestamp = first_pageview_timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
else:
seconds_since_first_pageview = -1
first_pageview_timestamp = None
# The calculated data is appended to the request
request['num_pageviews'] = num_pageviews_state.read()
request['first_pageview_timestamp'] = first_pageview_timestamp
request['seconds_since_first_pageview'] = seconds_since_first_pageview
# The pageview counter is reset
num_pageviews_state.clear()
# The request is returned
yield (session_id, request)
elif event_type == 'pageview':
# This is a pageview
pageview = event
# Update first pageview state
first_pageview = first_pageview_state.read()
if first_pageview is None:
first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))
elif first_pageview > int(pageview['timestamp_utc'].timestamp()):
first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))
# Increase number of pageviews
num_pageviews_state.add(1)
# Do not return anything, pageviews are not further processed
# This DoFn logs the delay between the event time and the processing time
class LogTimeDelay(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
import datetime
import logging
timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
seconds_delay = (datetime.datetime.utcnow() - timestamp_utc).total_seconds()
logging.warning('Delayed by %s seconds', seconds_delay)
yield element
这似乎有效,并且让我在直接跑步者上平均延迟大约 1-2 秒。在 Cloud Dataflow 上,平均延迟约为 0.5-1 秒。所以总而言之,这似乎解决了问题定义。
进一步考虑
不过还有一些悬而未决的问题:
- 我正在使用全局 windows,这意味着就我而言,内部状态将永远保持。也许会话 windows 是正确的方法:当 x 秒内没有 pageviews/requests 时,window 关闭并释放内部状态。
- 处理延迟有点高,但也许我需要稍微调整一下 pubsub 部分。
- 我不知道这个解决方案比标准光束方法增加了多少开销或内存消耗。我也没有测试高工作负载和并行化。
我对 Apache Beam 很陌生并实现了我的第一个管道。
但现在我到了一个地步,我对如何结合 windowing 和加入感到困惑。
问题定义:
我有两个数据流,一个是用户的页面浏览量,另一个是用户的请求。他们共享描述用户会话的密钥 session_id,但每个人都有其他附加数据。
目标是在请求发生之前计算会话中的综合浏览量。这意味着,我想要一个数据流,其中包含每个请求以及请求之前的综合浏览量。假设最后 5 分钟的浏览量就足够了。
我试过的
为了加载请求,我使用了这个代码片段,它从 pubsub 订阅加载请求,然后提取 session_id 作为键。最后,我应用了一个 window,它在收到请求时直接发出每个请求。
requests = (p
| 'Read Requests' >> (
beam.io.ReadFromPubSub(subscription=request_sub)
| 'Extract' >> beam.Map(lambda x: json.loads(x))
| 'Session as Key' >> beam.Map(lambda request: (request['session_id'], request))
| 'Window' >> beam.WindowInto(window.SlidingWindows(5 * 60, 1 * 60, 0),
trigger=trigger.AfterCount(1),
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
)
)
同样,此代码段加载综合浏览量,它应用滑动 window,每当综合浏览量进入时就会累积发出。
pageviews = (p
| 'Read Pageviews' >> (
beam.io.ReadFromPubSub(subscription=pageview_sub)
| 'Extract' >> beam.Map(lambda x: json.loads(x))
| 'Session as Key' >> beam.Map(lambda pageview: (pageview['session_id'], pageview))
| 'Window' >> beam.WindowInto(
windowfn=window.SlidingWindows(5 * 60, 1 * 60, 0),
trigger=trigger.AfterCount(1),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
)
)
为了应用连接,我尝试了
combined = (
{
'requests': requests,
'pageviews': pageviews
}
| 'Merge' >> beam.CoGroupByKey()
| 'Print' >> beam.Map(print)
)
当我运行这个管道时,合并的行中从来没有包含请求和综合浏览量的行,只有其中一个。
我的想法是在request之前过滤pageviews,在cogroupby之后统计。我需要做什么?我想我的问题是 windowing 和触发策略。
以低延迟处理请求也很重要,可能会丢弃迟到的综合浏览量。
我自己找到了一个解决方案,如果有人感兴趣,我会提供:
想法
诀窍是使用 beam.Flatten
操作合并两个流,并使用 Stateful DoFn 来计算一个请求之前的综合浏览量。每个流包含 json 个词典。我通过使用 {'request' : request}
和 {'pageview' : pageview}
作为周围块来嵌入它们,这样我就可以在 Stateful DoFn 中将不同的事件分开。我还计算了第一次网页浏览时间戳和自第一次网页浏览以来的秒数。流必须使用 session_id
作为键,这样 Stateful DoFn 只接收一个会话的所有事件。
代码
首先,这是管道代码:
# Beam pipeline, that extends requests by number of pageviews before request in that session
with beam.Pipeline(options=options) as p:
# The stream of requests
requests = (
'Read from PubSub subscription' >> beam.io.ReadFromPubSub(subscription=request_sub)
| 'Extract JSON' >> beam.ParDo(ExtractJSON())
| 'Add Timestamp' >> beam.ParDo(AssignTimestampFn())
| 'Use Session ID as stream key' >> beam.Map(lambda request: (request['session_id'], request))
| 'Add type of event' >> beam.Map(lambda r: (r[0], ('request', r[1])))
)
# The stream of pageviews
pageviews = (
'Read from PubSub subscription' >> beam.io.ReadFromPubSub(subscription=pageview_sub)
| 'Extract JSON' >> beam.ParDo(ExtractJSON())
| 'Add Timestamp' >> beam.ParDo(AssignTimestampFn())
| 'Use Session ID as stream key' >> beam.Map(lambda pageview: (pageview['session_id'], pageview))
| 'Add type of event' >> beam.Map(lambda p: (p[0], ('pageview', p[1])))
)
# Combine the streams and apply Stateful DoFn
combined = (
(
p | ('Prepare requests stream' >> requests),
p | ('Prepare pageviews stream' >> pageviews)
)
| 'Combine event streams' >> beam.Flatten()
| 'Global Window' >> beam.WindowInto(windowfn=window.GlobalWindows(),
trigger=trigger.AfterCount(1),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| 'Stateful DoFn' >> beam.ParDo(CountPageviews())
| 'Compute processing delay' >> beam.ParDo(LogTimeDelay())
| 'Format for BigQuery output' >> beam.ParDo(FormatForOutputDoFn())
)
# Write to BigQuery.
combined | 'Write' >> beam.io.WriteToBigQuery(
requests_extended_table,
schema=REQUESTS_EXTENDED_TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
有趣的部分是使用 beam.Flatten
和应用有状态 DoFn CountPageviews()
这是使用的自定义 DoFns 的代码:
# This DoFn just loads a json message
class ExtractJSON(beam.DoFn):
def process(self, element):
import json
yield json.loads(element)
# This DoFn adds the event timestamp of messages into it json elements for further processing
class AssignTimestampFn(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
import datetime
timestamped_element = element
timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
timestamp = timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
timestamped_element['timestamp_utc'] = timestamp_utc
timestamped_element['timestamp'] = timestamp
yield timestamped_element
# This class is a stateful dofn
# Input elements should be of form (session_id, {'event_type' : event}
# Where events can be requests or pageviews
# It computes on a per session basis the number of pageviews and the first pageview timestamp
# in its internal state
# Whenever a request comes in, it appends the internal state to the request and emits
# a extended request
# Whenever a pageview comes in, the internal state is updated but nothing is emitted
class CountPageviewsStateful(beam.DoFn):
# The internal states
NUM_PAGEVIEWS = userstate.CombiningValueStateSpec('num_pageviews', combine_fn=sum)
FIRST_PAGEVIEW = userstate.ReadModifyWriteStateSpec('first_pageview', coder=beam.coders.VarIntCoder())
def process(self,
element,
num_pageviews_state=beam.DoFn.StateParam(NUM_PAGEVIEWS),
first_pageview_state=beam.DoFn.StateParam(FIRST_PAGEVIEW)
):
import datetime
# Extract element
session_id = element[0]
event_type, event = element[1]
# Process different event types
# Depending on event type, different actions are done
if event_type == 'request':
# This is a request
request = event
# First, the first pageview timestamp is extracted and the seconds since first timestamp are calculated
first_pageview = first_pageview_state.read()
if first_pageview is not None:
seconds_since_first_pageview = (int(request['timestamp_utc'].timestamp()) - first_pageview)
first_pageview_timestamp_utc = datetime.datetime.utcfromtimestamp(float(first_pageview))
first_pageview_timestamp = first_pageview_timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
else:
seconds_since_first_pageview = -1
first_pageview_timestamp = None
# The calculated data is appended to the request
request['num_pageviews'] = num_pageviews_state.read()
request['first_pageview_timestamp'] = first_pageview_timestamp
request['seconds_since_first_pageview'] = seconds_since_first_pageview
# The pageview counter is reset
num_pageviews_state.clear()
# The request is returned
yield (session_id, request)
elif event_type == 'pageview':
# This is a pageview
pageview = event
# Update first pageview state
first_pageview = first_pageview_state.read()
if first_pageview is None:
first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))
elif first_pageview > int(pageview['timestamp_utc'].timestamp()):
first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))
# Increase number of pageviews
num_pageviews_state.add(1)
# Do not return anything, pageviews are not further processed
# This DoFn logs the delay between the event time and the processing time
class LogTimeDelay(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
import datetime
import logging
timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
seconds_delay = (datetime.datetime.utcnow() - timestamp_utc).total_seconds()
logging.warning('Delayed by %s seconds', seconds_delay)
yield element
这似乎有效,并且让我在直接跑步者上平均延迟大约 1-2 秒。在 Cloud Dataflow 上,平均延迟约为 0.5-1 秒。所以总而言之,这似乎解决了问题定义。
进一步考虑
不过还有一些悬而未决的问题:
- 我正在使用全局 windows,这意味着就我而言,内部状态将永远保持。也许会话 windows 是正确的方法:当 x 秒内没有 pageviews/requests 时,window 关闭并释放内部状态。
- 处理延迟有点高,但也许我需要稍微调整一下 pubsub 部分。
- 我不知道这个解决方案比标准光束方法增加了多少开销或内存消耗。我也没有测试高工作负载和并行化。