Start Dataflow job from Cloud Function - ModuleNotFoundError: No module named 'google.cloud.functions'
Start Dataflow job from Cloud Function - ModuleNotFoundError: No module named 'google.cloud.functions'
出于学习目的,我正在尝试在 Dataflow 上启动一个简单的批处理 ETL 过程。这是我执行的逻辑:
Cloud Storage > PubSub > Cloud Function > DataFlow > Cloud Storage
只要有新文件上传到存储桶,PubSub 主题就会发布一条消息。然后,CloudFunction 监听该主题的订阅,并启动 DataFlow 作业读取文件,执行数据处理并将其保存到同一个存储桶上的新文件中。
我已经能够执行所有逻辑,但是我正在努力通过 CloudFunction 实例启动数据流作业。我的功能可以毫无问题地启动作业,但一分钟后工作人员显示以下错误消息:
Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/
dataflow_worker/batchworker.py", line 773, in run self._load_main_session(self.local_staging_directory) File
"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 514, in _load_main_session
pickler.load_session(session_file) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/
pickler.py", line 311, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.7/
site-packages/dill/_dill.py", line 368, in load_session module = unpickler.load() File "/usr/local/lib/
python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/
lib/python3.7/site-packages/dill/_dill.py", line 827, in _import_module return getattr(__import__(module,
None, None, [obj]), obj) ModuleNotFoundError: No module named 'google.cloud.functions'
错误的重要部分是:
ModuleNotFoundError: No module named 'google.cloud.functions'
我的 CloudFunction 目录如下所示:
/
requirements.txt
main.py
pipeline.py
requirements.txt
# Function dependencies, for example:
# package>=version
apache-beam[gcp]
main.py
import base64
import json
from pipeline import run
def start_job(event, context):
message = base64.b64decode(event['data']).decode('utf-8')
message = json.loads(message)
bucket = message['bucket']
filename = message['name']
if filename.startswith('raw/'):
run(bucket, filename)
print('Job sent to Dataflow')
else:
print('File uploaded to unknow directory: {}'.format(source_file))
pipeline.py
import apache_beam as beam
from datetime import datetime
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "dpto-bigdata"
google_cloud_options.region = "europe-west1"
google_cloud_options.job_name = "pipeline-test"
google_cloud_options.staging_location = "gs://services-files/staging/"
google_cloud_options.temp_location = "gs://services-files/temp/"
#options.view_as(StandardOptions).runner = "DirectRunner" # use this for debugging
options.view_as(StandardOptions).runner = "DataFlowRunner"
options.view_as(SetupOptions).save_main_session = True
output_suffix = '.csv'
output_header = 'Name,Total,HP,Attack,Defence,Sp_attack,Sp_defence,Speed,Average'
def run(bucket, filename):
source_file = 'gs://{}/{}'.format(bucket, filename)
now = datetime.now().strftime('%Y%m%d-%H%M%S')
output_prefix = 'gs://{}/processed/{}'.format(bucket, now)
with beam.Pipeline(options=options) as p:
raw_values = (
p
| "Read from Cloud Storage" >> beam.io.ReadFromText(source_file, skip_header_lines=1)
| "Split columns" >> beam.Map(lambda x: x.split(','))
| "Cleanup entries" >> beam.ParDo(ElementCleanup())
| "Calculate average stats" >> beam.Map(calculate_average)
| "Format output" >> beam.Map(format_output)
| "Write to Cloud Storage" >> beam.io.WriteToText(file_path_prefix=output_prefix, file_name_suffix=output_suffix, header=output_header)
)
class ElementCleanup(beam.DoFn):
def __init__(self):
self.transforms = self.map_transforms()
def map_transforms(self):
return [
[self.trim, self.to_lowercase], # Name
[self.trim, self.to_float], # Total
[self.trim, self.to_float], # HP
[self.trim, self.to_float], # Attack
[self.trim, self.to_float], # Defence
[self.trim, self.to_float], # Sp_attack
[self.trim, self.to_float], # Sp_defence
[self.trim, self.to_float] # Speed
]
def process(self, row):
return [self.clean_row(row, self.transforms)]
def clean_row(self, row, transforms):
cleaned = []
for idx, col in enumerate(row):
for func in transforms[idx]:
col = func(col)
cleaned.append(col)
return cleaned
def to_lowercase(self, col:str):
return col.lower()
def trim(self, col:str):
return col.strip()
def to_float(self, col:str):
return (float(col) if col != None else None)
def calculate_average(row):
average = round(sum(row[2:]) / len(row[2:]), 2)
row.append(average)
return row
def format_output(row):
row = [str(col) for col in row]
return ','.join(row)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"--bucket",
help="Bucket to read from."
)
parser.add_argument(
"--filename",
help="File to read from."
)
args = parser.parse_args()
run(args.bucket, args.filename)
我已经阅读了一段时间关于这个主题的内容。在此错误之前,我有一个类似的错误显示 ModuleNotFoundError: No module named 'main'
。我能够通过添加管道选项 options.view_as(SetupOptions).save_main_session = True
来解决这个问题,但是我还没有找到任何解决我当前面临的错误的方法。
我希望 Dataflow 工作人员在我开始管道作业后不会依赖 CloudFunction,但似乎他们仍在尝试以某种方式与之通信。
不要那样做。
理由是:
- 云函数(无限扩展)- 适合处理不常发生的事件。
- 云 运行 - 类似于云函数,但您可以同时处理多个事件 - 适用于同时爆发的事件(在这种情况下,它比云函数便宜)
- 从 PubSub 读取数据流 -> 处理频繁/稳定的数据批次 (fils) 或数据流 (PubSub/Kafka)。
为每个文件触发数据流作业确实在时间和成本(分钟和美元)方面都是低效的。
如果您需要使用 Dataflow 连续 响应文件通知(完成、删除等),您应该将存储通知发送到 pubsub 主题并从数据流订阅中读取它们。请注意,这仅适用于流式传输。
如果您使用 ReadFromPubsub 读取存储通知:
with beam.Pipeline(options=pipeline_options) as pipeline:
pubsub_msgs = pipeline | (
'Read PubSub Messages' >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=global_vars.input_subscription) )
我认为这里最好的方法是使用 templates,因为您不是在更改代码,而是在更改路径。获得模板后,您只需调用 API 即可启动它们。设置起来肯定不会那么麻烦,而且可能更有弹性,因为您不会那么依赖 Cloud Functions。
我认为还有另一种方法会更好,它不需要 Cloud Functions。您可以使用 Java. If you need / want Python, there's no counterpart for it yet, but I took the liberty to create a version of it that does the same thing and send a Pull Request 中的 MatchAll.continuously
之类的东西来进行新的 Ptransform。
想法是每隔 X 秒,您检查新文件并根据您的管道处理它们。
如果您不想合并 Pull Request(如果是这样),您可以只复制 DoFn:
class MatchContinuously(beam.PTransform):
def __init__(
self,
file_pattern,
interval=360.0,
has_deduplication=True,
start_timestamp=Timestamp.now(),
stop_timestamp=MAX_TIMESTAMP):
self.file_pattern = file_pattern
self.interval = interval
self.has_deduplication = has_deduplication
self.start_ts = start_timestamp
self.stop_ts = stop_timestamp
def expand(self, pcol):
impulse = pcol | PeriodicImpulse(
start_timestamp=self.start_ts,
stop_timestamp=self.stop_ts,
fire_interval=self.interval)
match_files = (
impulse
| beam.Map(lambda x: self.file_pattern)
| MatchAll())
if self.has_deduplication:
match_files = (
match_files
# Making a Key Value so each file has its own state.
| "To KV" >> beam.Map(lambda x: (x.path, x))
| "Remove Already Read" >> beam.ParDo(_RemoveDuplicates()))
return match_files
class _RemoveDuplicates(beam.DoFn):
FILES_STATE = BagStateSpec('files', StrUtf8Coder())
def process(self, element, file_state=beam.DoFn.StateParam(FILES_STATE)):
path = element[0]
file_metadata = element[1]
bag_content = [x for x in file_state.read()]
if not bag_content:
file_state.add(path)
_LOGGER.info("Generated entry for file %s", path)
yield file_metadata
else:
_LOGGER.info("File %s was already read", path)
管道示例:
(p | MatchContinuously("gs://apache-beam-samples/shakespeare/*", 180)
| Map(lambda x: x.path)
| ReadAllFromText()
| Map(lambda x: logging.info(x))
)
第三种方法可以继续使用 GCS 通知并使用 PubSub
+ MatchAll
。管道看起来像:
(p | ReadFromPubSub(topic)
| MatchAll())
)
根据新文件的频率以及是否要使用通知,您可以在这三种方法之间做出选择。
出于学习目的,我正在尝试在 Dataflow 上启动一个简单的批处理 ETL 过程。这是我执行的逻辑:
Cloud Storage > PubSub > Cloud Function > DataFlow > Cloud Storage
只要有新文件上传到存储桶,PubSub 主题就会发布一条消息。然后,CloudFunction 监听该主题的订阅,并启动 DataFlow 作业读取文件,执行数据处理并将其保存到同一个存储桶上的新文件中。
我已经能够执行所有逻辑,但是我正在努力通过 CloudFunction 实例启动数据流作业。我的功能可以毫无问题地启动作业,但一分钟后工作人员显示以下错误消息:
Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/
dataflow_worker/batchworker.py", line 773, in run self._load_main_session(self.local_staging_directory) File
"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 514, in _load_main_session
pickler.load_session(session_file) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/
pickler.py", line 311, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.7/
site-packages/dill/_dill.py", line 368, in load_session module = unpickler.load() File "/usr/local/lib/
python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/
lib/python3.7/site-packages/dill/_dill.py", line 827, in _import_module return getattr(__import__(module,
None, None, [obj]), obj) ModuleNotFoundError: No module named 'google.cloud.functions'
错误的重要部分是:
ModuleNotFoundError: No module named 'google.cloud.functions'
我的 CloudFunction 目录如下所示:
/
requirements.txt
main.py
pipeline.py
requirements.txt
# Function dependencies, for example:
# package>=version
apache-beam[gcp]
main.py
import base64
import json
from pipeline import run
def start_job(event, context):
message = base64.b64decode(event['data']).decode('utf-8')
message = json.loads(message)
bucket = message['bucket']
filename = message['name']
if filename.startswith('raw/'):
run(bucket, filename)
print('Job sent to Dataflow')
else:
print('File uploaded to unknow directory: {}'.format(source_file))
pipeline.py
import apache_beam as beam
from datetime import datetime
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "dpto-bigdata"
google_cloud_options.region = "europe-west1"
google_cloud_options.job_name = "pipeline-test"
google_cloud_options.staging_location = "gs://services-files/staging/"
google_cloud_options.temp_location = "gs://services-files/temp/"
#options.view_as(StandardOptions).runner = "DirectRunner" # use this for debugging
options.view_as(StandardOptions).runner = "DataFlowRunner"
options.view_as(SetupOptions).save_main_session = True
output_suffix = '.csv'
output_header = 'Name,Total,HP,Attack,Defence,Sp_attack,Sp_defence,Speed,Average'
def run(bucket, filename):
source_file = 'gs://{}/{}'.format(bucket, filename)
now = datetime.now().strftime('%Y%m%d-%H%M%S')
output_prefix = 'gs://{}/processed/{}'.format(bucket, now)
with beam.Pipeline(options=options) as p:
raw_values = (
p
| "Read from Cloud Storage" >> beam.io.ReadFromText(source_file, skip_header_lines=1)
| "Split columns" >> beam.Map(lambda x: x.split(','))
| "Cleanup entries" >> beam.ParDo(ElementCleanup())
| "Calculate average stats" >> beam.Map(calculate_average)
| "Format output" >> beam.Map(format_output)
| "Write to Cloud Storage" >> beam.io.WriteToText(file_path_prefix=output_prefix, file_name_suffix=output_suffix, header=output_header)
)
class ElementCleanup(beam.DoFn):
def __init__(self):
self.transforms = self.map_transforms()
def map_transforms(self):
return [
[self.trim, self.to_lowercase], # Name
[self.trim, self.to_float], # Total
[self.trim, self.to_float], # HP
[self.trim, self.to_float], # Attack
[self.trim, self.to_float], # Defence
[self.trim, self.to_float], # Sp_attack
[self.trim, self.to_float], # Sp_defence
[self.trim, self.to_float] # Speed
]
def process(self, row):
return [self.clean_row(row, self.transforms)]
def clean_row(self, row, transforms):
cleaned = []
for idx, col in enumerate(row):
for func in transforms[idx]:
col = func(col)
cleaned.append(col)
return cleaned
def to_lowercase(self, col:str):
return col.lower()
def trim(self, col:str):
return col.strip()
def to_float(self, col:str):
return (float(col) if col != None else None)
def calculate_average(row):
average = round(sum(row[2:]) / len(row[2:]), 2)
row.append(average)
return row
def format_output(row):
row = [str(col) for col in row]
return ','.join(row)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"--bucket",
help="Bucket to read from."
)
parser.add_argument(
"--filename",
help="File to read from."
)
args = parser.parse_args()
run(args.bucket, args.filename)
我已经阅读了一段时间关于这个主题的内容。在此错误之前,我有一个类似的错误显示 ModuleNotFoundError: No module named 'main'
。我能够通过添加管道选项 options.view_as(SetupOptions).save_main_session = True
来解决这个问题,但是我还没有找到任何解决我当前面临的错误的方法。
我希望 Dataflow 工作人员在我开始管道作业后不会依赖 CloudFunction,但似乎他们仍在尝试以某种方式与之通信。
不要那样做。
理由是:
- 云函数(无限扩展)- 适合处理不常发生的事件。
- 云 运行 - 类似于云函数,但您可以同时处理多个事件 - 适用于同时爆发的事件(在这种情况下,它比云函数便宜)
- 从 PubSub 读取数据流 -> 处理频繁/稳定的数据批次 (fils) 或数据流 (PubSub/Kafka)。
为每个文件触发数据流作业确实在时间和成本(分钟和美元)方面都是低效的。
如果您需要使用 Dataflow 连续 响应文件通知(完成、删除等),您应该将存储通知发送到 pubsub 主题并从数据流订阅中读取它们。请注意,这仅适用于流式传输。
如果您使用 ReadFromPubsub 读取存储通知:
with beam.Pipeline(options=pipeline_options) as pipeline:
pubsub_msgs = pipeline | (
'Read PubSub Messages' >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=global_vars.input_subscription) )
我认为这里最好的方法是使用 templates,因为您不是在更改代码,而是在更改路径。获得模板后,您只需调用 API 即可启动它们。设置起来肯定不会那么麻烦,而且可能更有弹性,因为您不会那么依赖 Cloud Functions。
我认为还有另一种方法会更好,它不需要 Cloud Functions。您可以使用 Java. If you need / want Python, there's no counterpart for it yet, but I took the liberty to create a version of it that does the same thing and send a Pull Request 中的 MatchAll.continuously
之类的东西来进行新的 Ptransform。
想法是每隔 X 秒,您检查新文件并根据您的管道处理它们。
如果您不想合并 Pull Request(如果是这样),您可以只复制 DoFn:
class MatchContinuously(beam.PTransform):
def __init__(
self,
file_pattern,
interval=360.0,
has_deduplication=True,
start_timestamp=Timestamp.now(),
stop_timestamp=MAX_TIMESTAMP):
self.file_pattern = file_pattern
self.interval = interval
self.has_deduplication = has_deduplication
self.start_ts = start_timestamp
self.stop_ts = stop_timestamp
def expand(self, pcol):
impulse = pcol | PeriodicImpulse(
start_timestamp=self.start_ts,
stop_timestamp=self.stop_ts,
fire_interval=self.interval)
match_files = (
impulse
| beam.Map(lambda x: self.file_pattern)
| MatchAll())
if self.has_deduplication:
match_files = (
match_files
# Making a Key Value so each file has its own state.
| "To KV" >> beam.Map(lambda x: (x.path, x))
| "Remove Already Read" >> beam.ParDo(_RemoveDuplicates()))
return match_files
class _RemoveDuplicates(beam.DoFn):
FILES_STATE = BagStateSpec('files', StrUtf8Coder())
def process(self, element, file_state=beam.DoFn.StateParam(FILES_STATE)):
path = element[0]
file_metadata = element[1]
bag_content = [x for x in file_state.read()]
if not bag_content:
file_state.add(path)
_LOGGER.info("Generated entry for file %s", path)
yield file_metadata
else:
_LOGGER.info("File %s was already read", path)
管道示例:
(p | MatchContinuously("gs://apache-beam-samples/shakespeare/*", 180)
| Map(lambda x: x.path)
| ReadAllFromText()
| Map(lambda x: logging.info(x))
)
第三种方法可以继续使用 GCS 通知并使用 PubSub
+ MatchAll
。管道看起来像:
(p | ReadFromPubSub(topic)
| MatchAll())
)
根据新文件的频率以及是否要使用通知,您可以在这三种方法之间做出选择。