google 数据流中的 Pardo 函数未产生任何输出
Pardo Function in google dataflow not producing any output
我正在尝试在数据流中创建我的第一个管道,当我使用交互式光束运行器执行时我有相同的代码运行但在数据流上我遇到了各种错误,这对我来说没有多大意义。
{"timestamp":1589992571906,"lastPageVisited":"https://kickassdataprojects.com/simple-and-complete-tutorial-on-simple-linear-regression/","pageUrl":"https://kickassdataprojects.com/","pageTitle":"Helping%20companies%20and%20developers%20create%20awesome%20data%20projects%20%7C%20Data%20Engineering/%20Data%20Science%20Blog","eventType":"Pageview","landingPage":0,"referrer":"direct","uiud":"31af5f22-4cc4-48e0-9478-49787dd5a19f","sessionId":322371}
这是我的代码
from __future__ import absolute_import
import apache_beam as beam
#from apache_beam.runners.interactive import interactive_runner
#import apache_beam.runners.interactive.interactive_beam as ib
import google.auth
from datetime import timedelta
import json
from datetime import datetime
from apache_beam import window
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode, AfterCount
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
import argparse
import logging
from time import mktime
def setTimestamp(elem):
from apache_beam import window
return window.TimestampedValue(elem, elem['timestamp'])
def createTuples(elem):
return (elem["sessionId"], elem)
def checkOutput(elem):
print(elem)
return elem
class WriteToBigQuery(beam.PTransform):
"""Generate, format, and write BigQuery table row information."""
def __init__(self, table_name, dataset, schema, project):
"""Initializes the transform.
Args:
table_name: Name of the BigQuery table to use.
dataset: Name of the dataset to use.
schema: Dictionary in the format {'column_name': 'bigquery_type'}
project: Name of the Cloud project containing BigQuery table.
"""
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
#super(WriteToBigQuery, self).__init__()
beam.PTransform.__init__(self)
self.table_name = table_name
self.dataset = dataset
self.schema = schema
self.project = project
def get_schema(self):
"""Build the output table schema."""
return ', '.join('%s:%s' % (col, self.schema[col]) for col in self.schema)
def expand(self, pcoll):
return (
pcoll
| 'ConvertToRow' >>
beam.Map(lambda elem: {col: elem[col]
for col in self.schema})
| beam.io.WriteToBigQuery(
self.table_name, self.dataset, self.project, self.get_schema()))
class ParseSessionEventFn(beam.DoFn):
"""Parses the raw game event info into a Python dictionary.
Each event line has the following format:
username,teamname,score,timestamp_in_ms,readable_time
e.g.:
user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
The human-readable time string is not used here.
"""
def __init__(self):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
#super(ParseSessionEventFn, self).__init__()
beam.DoFn.__init__(self)
def process(self, elem):
#timestamp = mktime(datetime.strptime(elem["timestamp"], "%Y-%m-%d %H:%M:%S").utctimetuple())
elem['sessionId'] = int(elem['sessionId'])
elem['landingPage'] = int(elem['landingPage'])
yield elem
class AnalyzeSessions(beam.DoFn):
def __init__(self):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
#super(AnalyzeSessions, self).__init__()
from apache_beam import window
beam.DoFn.__init__(self)
def process(self, elem, window=beam.DoFn.WindowParam):
from apache_beam import window
sessionId = elem[0]
uiud = elem[1][0]["uiud"]
count_of_events = 0
pageUrl = []
window_end = window.end.to_utc_datetime()
window_start = window.start.to_utc_datetime()
session_duration = window_end - window_start
for rows in elem[1]:
if rows["landingPage"] == 1:
referrer = rows["refererr"]
pageUrl.append(rows["pageUrl"])
print({
"pageUrl":pageUrl,
"eventType":"pageview",
"uiud":uiud,
"sessionId":sessionId,
"session_duration": session_duration,
"window_start" : window_start
})
yield {
'pageUrl':pageUrl,
'eventType':"pageview",
'uiud':uiud,
'sessionId':sessionId,
'session_duration': session_duration,
'window_start' : window_start,
}
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument('--topic', type=str, help='Pub/Sub topic to read from')
parser.add_argument(
'--subscription', type=str, help='Pub/Sub subscription to read from')
parser.add_argument(
'--dataset',
type=str,
required=True,
help='BigQuery Dataset to write tables to. '
'Must already exist.')
parser.add_argument(
'--table_name',
type=str,
default='game_stats',
help='The BigQuery table name. Should not already exist.')
parser.add_argument(
'--fixed_window_duration',
type=int,
default=60,
help='Numeric value of fixed window duration for user '
'analysis, in minutes')
parser.add_argument(
'--session_gap',
type=int,
default=5,
help='Numeric value of gap between user sessions, '
'in minutes')
parser.add_argument(
'--user_activity_window_duration',
type=int,
default=30,
help='Numeric value of fixed window for finding mean of '
'user session duration, in minutes')
args, pipeline_args = parser.parse_known_args(argv)
session_gap = args.session_gap * 60
options = PipelineOptions(pipeline_args)
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(StandardOptions).streaming = True
options.view_as( StandardOptions).runner= 'DataflowRunner'
options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=options)
lines = (p
| beam.io.ReadFromPubSub(
subscription="projects/phrasal-bond-274216/subscriptions/rrrr")
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| beam.Map(lambda x: json.loads(x))
| beam.ParDo(ParseSessionEventFn())
)
next = ( lines
| 'AddEventTimestamps' >> beam.Map(setTimestamp)
| 'Create Tuples' >> beam.Map(createTuples)
| 'Window' >> beam.WindowInto(window.Sessions(15))
| 'group by key' >> beam.GroupByKey()
| 'analyze sessions' >> beam.ParDo(AnalyzeSessions())
| beam.Map(print)
| 'WriteTeamScoreSums' >> WriteToBigQuery(
args.table_name, args.dataset,
{
"uiud":'STRING',
"session_duration": 'INTEGER',
"window_start" : 'TIMESTAMP'
},
options.view_as(GoogleCloudOptions).project)
)
result = p.run()
# result.wait_till_termination()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
我面临的问题是 AnalyzeSessions Pardo 函数,它不会产生任何输出或任何错误。我已经在交互式 beam runner 中尝试了这段代码,它成功了。
知道为什么管道不工作吗?
包括分析会话步骤在内的所有其他步骤都有一个输入
这是输出部分。
此后的步骤不起作用,我也没有在日志中看到任何内容,打印语句没有在其中添加内容。
编辑:如果这对我的数据有帮助,请看我的数据如何进入 WindoInto 和 GroupBy 关键步骤。
关于我应该尝试什么的任何想法。
我只在 Java SDK 中使用过 Beam,但处理功能一般不会 return。它调用 Beam 提供的回调来输出结果。这样一来,您就可以接受一个输入并 return 任意数量的输出。
在您的示例中,AnalyzeSessions#process
有一个 return
语句。查看 Beam 示例,我在 DoFn 的 process
函数中看到 yield
语句。尝试 yield
?那是 Python 版本的输出回调吗?
https://beam.apache.org/get-started/wordcount-example/#specifying-explicit-dofns
看来这里的问题出在你的AnalyzeSessions ParDo之前;根据您的屏幕截图,它没有收到任何输入。
从您的管道代码来看,GroupByKey 中最有可能延迟的步骤也是执行会话化逻辑。有两件事可能会导致延迟。首先,windows(例如你的会话)只会在输入水印通过 window 结束时关闭。如果管道跟不上它的输入,水印就不会前进。您的管道是否具有高数据新鲜度 and/or Pubsub 中的大量积压?如果是这样,请尝试使用更多工作器,或 运行 处理您的数据子集。
其次,如果您的会话永远不会结束,您将不会获得任何输出。如果对于每个会话都有持续不断的元素流,以至于永远不会达到间隙持续时间,就会发生这种情况。
编辑:
看起来问题在于您如何设置时间戳。一般在使用PubsubIO时,如果你想要自定义事件时间戳,你应该把时间戳放在消息的一个属性中,然后设置ReadFromPubsub的timestamp_attribute参数指向那个属性。这允许 Dataflow 了解时间戳,以便为管道生成正确的水印。
根据您的操作,Dataflow 的事件时间戳视图是默认视图,即消息的发布时间。然后,您向后移动了事件时间戳,这导致元素为 "late"。这导致它们被丢弃在 GroupByKey 中。
我正在尝试在数据流中创建我的第一个管道,当我使用交互式光束运行器执行时我有相同的代码运行但在数据流上我遇到了各种错误,这对我来说没有多大意义。
{"timestamp":1589992571906,"lastPageVisited":"https://kickassdataprojects.com/simple-and-complete-tutorial-on-simple-linear-regression/","pageUrl":"https://kickassdataprojects.com/","pageTitle":"Helping%20companies%20and%20developers%20create%20awesome%20data%20projects%20%7C%20Data%20Engineering/%20Data%20Science%20Blog","eventType":"Pageview","landingPage":0,"referrer":"direct","uiud":"31af5f22-4cc4-48e0-9478-49787dd5a19f","sessionId":322371}
这是我的代码
from __future__ import absolute_import
import apache_beam as beam
#from apache_beam.runners.interactive import interactive_runner
#import apache_beam.runners.interactive.interactive_beam as ib
import google.auth
from datetime import timedelta
import json
from datetime import datetime
from apache_beam import window
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode, AfterCount
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
import argparse
import logging
from time import mktime
def setTimestamp(elem):
from apache_beam import window
return window.TimestampedValue(elem, elem['timestamp'])
def createTuples(elem):
return (elem["sessionId"], elem)
def checkOutput(elem):
print(elem)
return elem
class WriteToBigQuery(beam.PTransform):
"""Generate, format, and write BigQuery table row information."""
def __init__(self, table_name, dataset, schema, project):
"""Initializes the transform.
Args:
table_name: Name of the BigQuery table to use.
dataset: Name of the dataset to use.
schema: Dictionary in the format {'column_name': 'bigquery_type'}
project: Name of the Cloud project containing BigQuery table.
"""
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
#super(WriteToBigQuery, self).__init__()
beam.PTransform.__init__(self)
self.table_name = table_name
self.dataset = dataset
self.schema = schema
self.project = project
def get_schema(self):
"""Build the output table schema."""
return ', '.join('%s:%s' % (col, self.schema[col]) for col in self.schema)
def expand(self, pcoll):
return (
pcoll
| 'ConvertToRow' >>
beam.Map(lambda elem: {col: elem[col]
for col in self.schema})
| beam.io.WriteToBigQuery(
self.table_name, self.dataset, self.project, self.get_schema()))
class ParseSessionEventFn(beam.DoFn):
"""Parses the raw game event info into a Python dictionary.
Each event line has the following format:
username,teamname,score,timestamp_in_ms,readable_time
e.g.:
user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
The human-readable time string is not used here.
"""
def __init__(self):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
#super(ParseSessionEventFn, self).__init__()
beam.DoFn.__init__(self)
def process(self, elem):
#timestamp = mktime(datetime.strptime(elem["timestamp"], "%Y-%m-%d %H:%M:%S").utctimetuple())
elem['sessionId'] = int(elem['sessionId'])
elem['landingPage'] = int(elem['landingPage'])
yield elem
class AnalyzeSessions(beam.DoFn):
def __init__(self):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
#super(AnalyzeSessions, self).__init__()
from apache_beam import window
beam.DoFn.__init__(self)
def process(self, elem, window=beam.DoFn.WindowParam):
from apache_beam import window
sessionId = elem[0]
uiud = elem[1][0]["uiud"]
count_of_events = 0
pageUrl = []
window_end = window.end.to_utc_datetime()
window_start = window.start.to_utc_datetime()
session_duration = window_end - window_start
for rows in elem[1]:
if rows["landingPage"] == 1:
referrer = rows["refererr"]
pageUrl.append(rows["pageUrl"])
print({
"pageUrl":pageUrl,
"eventType":"pageview",
"uiud":uiud,
"sessionId":sessionId,
"session_duration": session_duration,
"window_start" : window_start
})
yield {
'pageUrl':pageUrl,
'eventType':"pageview",
'uiud':uiud,
'sessionId':sessionId,
'session_duration': session_duration,
'window_start' : window_start,
}
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument('--topic', type=str, help='Pub/Sub topic to read from')
parser.add_argument(
'--subscription', type=str, help='Pub/Sub subscription to read from')
parser.add_argument(
'--dataset',
type=str,
required=True,
help='BigQuery Dataset to write tables to. '
'Must already exist.')
parser.add_argument(
'--table_name',
type=str,
default='game_stats',
help='The BigQuery table name. Should not already exist.')
parser.add_argument(
'--fixed_window_duration',
type=int,
default=60,
help='Numeric value of fixed window duration for user '
'analysis, in minutes')
parser.add_argument(
'--session_gap',
type=int,
default=5,
help='Numeric value of gap between user sessions, '
'in minutes')
parser.add_argument(
'--user_activity_window_duration',
type=int,
default=30,
help='Numeric value of fixed window for finding mean of '
'user session duration, in minutes')
args, pipeline_args = parser.parse_known_args(argv)
session_gap = args.session_gap * 60
options = PipelineOptions(pipeline_args)
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(StandardOptions).streaming = True
options.view_as( StandardOptions).runner= 'DataflowRunner'
options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=options)
lines = (p
| beam.io.ReadFromPubSub(
subscription="projects/phrasal-bond-274216/subscriptions/rrrr")
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| beam.Map(lambda x: json.loads(x))
| beam.ParDo(ParseSessionEventFn())
)
next = ( lines
| 'AddEventTimestamps' >> beam.Map(setTimestamp)
| 'Create Tuples' >> beam.Map(createTuples)
| 'Window' >> beam.WindowInto(window.Sessions(15))
| 'group by key' >> beam.GroupByKey()
| 'analyze sessions' >> beam.ParDo(AnalyzeSessions())
| beam.Map(print)
| 'WriteTeamScoreSums' >> WriteToBigQuery(
args.table_name, args.dataset,
{
"uiud":'STRING',
"session_duration": 'INTEGER',
"window_start" : 'TIMESTAMP'
},
options.view_as(GoogleCloudOptions).project)
)
result = p.run()
# result.wait_till_termination()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
我面临的问题是 AnalyzeSessions Pardo 函数,它不会产生任何输出或任何错误。我已经在交互式 beam runner 中尝试了这段代码,它成功了。
知道为什么管道不工作吗?
包括分析会话步骤在内的所有其他步骤都有一个输入
这是输出部分。
此后的步骤不起作用,我也没有在日志中看到任何内容,打印语句没有在其中添加内容。
编辑:如果这对我的数据有帮助,请看我的数据如何进入 WindoInto 和 GroupBy 关键步骤。
我只在 Java SDK 中使用过 Beam,但处理功能一般不会 return。它调用 Beam 提供的回调来输出结果。这样一来,您就可以接受一个输入并 return 任意数量的输出。
在您的示例中,AnalyzeSessions#process
有一个 return
语句。查看 Beam 示例,我在 DoFn 的 process
函数中看到 yield
语句。尝试 yield
?那是 Python 版本的输出回调吗?
https://beam.apache.org/get-started/wordcount-example/#specifying-explicit-dofns
看来这里的问题出在你的AnalyzeSessions ParDo之前;根据您的屏幕截图,它没有收到任何输入。
从您的管道代码来看,GroupByKey 中最有可能延迟的步骤也是执行会话化逻辑。有两件事可能会导致延迟。首先,windows(例如你的会话)只会在输入水印通过 window 结束时关闭。如果管道跟不上它的输入,水印就不会前进。您的管道是否具有高数据新鲜度 and/or Pubsub 中的大量积压?如果是这样,请尝试使用更多工作器,或 运行 处理您的数据子集。
其次,如果您的会话永远不会结束,您将不会获得任何输出。如果对于每个会话都有持续不断的元素流,以至于永远不会达到间隙持续时间,就会发生这种情况。
编辑: 看起来问题在于您如何设置时间戳。一般在使用PubsubIO时,如果你想要自定义事件时间戳,你应该把时间戳放在消息的一个属性中,然后设置ReadFromPubsub的timestamp_attribute参数指向那个属性。这允许 Dataflow 了解时间戳,以便为管道生成正确的水印。
根据您的操作,Dataflow 的事件时间戳视图是默认视图,即消息的发布时间。然后,您向后移动了事件时间戳,这导致元素为 "late"。这导致它们被丢弃在 GroupByKey 中。