Start Dataflow job from Cloud Function - ModuleNotFoundError: No module named 'google.cloud.functions'

Start Dataflow job from Cloud Function - ModuleNotFoundError: No module named 'google.cloud.functions'

出于学习目的,我正在尝试在 Dataflow 上启动一个简单的批处理 ETL 过程。这是我执行的逻辑:

Cloud Storage > PubSub > Cloud Function > DataFlow > Cloud Storage

只要有新文件上传到存储桶,PubSub 主题就会发布一条消息。然后,CloudFunction 监听该主题的订阅,并启动 DataFlow 作业读取文件,执行数据处理并将其保存到同一个存储桶上的新文件中。

我已经能够执行所有逻辑,但是我正在努力通过 CloudFunction 实例启动数据流作业。我的功能可以毫无问题地启动作业,但一分钟后工作人员显示以下错误消息:

Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/
dataflow_worker/batchworker.py", line 773, in run self._load_main_session(self.local_staging_directory) File
"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 514, in _load_main_session
pickler.load_session(session_file) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/
pickler.py", line 311, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.7/
site-packages/dill/_dill.py", line 368, in load_session module = unpickler.load() File "/usr/local/lib/
python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/
lib/python3.7/site-packages/dill/_dill.py", line 827, in _import_module return getattr(__import__(module,
None, None, [obj]), obj) ModuleNotFoundError: No module named 'google.cloud.functions'

错误的重要部分是:

ModuleNotFoundError: No module named 'google.cloud.functions'

我的 CloudFunction 目录如下所示:

/
  requirements.txt
  main.py
  pipeline.py

requirements.txt

# Function dependencies, for example:
# package>=version
apache-beam[gcp]

main.py

import base64
import json
from pipeline import run

def start_job(event, context):
  message = base64.b64decode(event['data']).decode('utf-8')
  message = json.loads(message)
  bucket = message['bucket']
  filename = message['name']

  if filename.startswith('raw/'):
    run(bucket, filename)
    print('Job sent to Dataflow')
  else:
    print('File uploaded to unknow directory: {}'.format(source_file))

pipeline.py

import apache_beam as beam
from datetime import datetime
import argparse

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions

options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "dpto-bigdata"
google_cloud_options.region = "europe-west1"
google_cloud_options.job_name = "pipeline-test"
google_cloud_options.staging_location = "gs://services-files/staging/"
google_cloud_options.temp_location = "gs://services-files/temp/"
#options.view_as(StandardOptions).runner = "DirectRunner"  # use this for debugging
options.view_as(StandardOptions).runner = "DataFlowRunner"
options.view_as(SetupOptions).save_main_session = True

output_suffix = '.csv'
output_header = 'Name,Total,HP,Attack,Defence,Sp_attack,Sp_defence,Speed,Average'

def run(bucket, filename):
  source_file = 'gs://{}/{}'.format(bucket, filename)
  now = datetime.now().strftime('%Y%m%d-%H%M%S')
  output_prefix = 'gs://{}/processed/{}'.format(bucket, now)

  with beam.Pipeline(options=options) as p:
    raw_values = (
      p
      | "Read from Cloud Storage" >> beam.io.ReadFromText(source_file, skip_header_lines=1)
      | "Split columns" >> beam.Map(lambda x: x.split(','))
      | "Cleanup entries" >> beam.ParDo(ElementCleanup())
      | "Calculate average stats" >> beam.Map(calculate_average)
      | "Format output" >> beam.Map(format_output)
      | "Write to Cloud Storage" >> beam.io.WriteToText(file_path_prefix=output_prefix, file_name_suffix=output_suffix, header=output_header)
    )

class ElementCleanup(beam.DoFn):
  def __init__(self):
    self.transforms = self.map_transforms()

  def map_transforms(self):
    return [
      [self.trim, self.to_lowercase],   # Name
      [self.trim, self.to_float],       # Total
      [self.trim, self.to_float],       # HP
      [self.trim, self.to_float],       # Attack
      [self.trim, self.to_float],       # Defence
      [self.trim, self.to_float],       # Sp_attack
      [self.trim, self.to_float],       # Sp_defence
      [self.trim, self.to_float]        # Speed
    ]

  def process(self, row):
    return [self.clean_row(row, self.transforms)]

  def clean_row(self, row, transforms):
    cleaned = []
    for idx, col in enumerate(row):
      for func in transforms[idx]:
        col = func(col)
      cleaned.append(col)
    return cleaned

  def to_lowercase(self, col:str):
    return col.lower()

  def trim(self, col:str):
    return col.strip()

  def to_float(self, col:str):
    return (float(col) if col != None else None)

def calculate_average(row):
    average = round(sum(row[2:]) / len(row[2:]), 2)
    row.append(average)
    return row

def format_output(row):
    row = [str(col) for col in row]
    return ','.join(row)

if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument(
      "--bucket",
      help="Bucket to read from."
  )
  parser.add_argument(
      "--filename",
      help="File to read from."
  )
  args = parser.parse_args()

  run(args.bucket, args.filename)

我已经阅读了一段时间关于这个主题的内容。在此错误之前,我有一个类似的错误显示 ModuleNotFoundError: No module named 'main'。我能够通过添加管道选项 options.view_as(SetupOptions).save_main_session = True 来解决这个问题,但是我还没有找到任何解决我当前面临的错误的方法。

我希望 Dataflow 工作人员在我开始管道作业后不会依赖 CloudFunction,但似乎他们仍在尝试以某种方式与之通信。

不要那样做。

理由是:

  • 云函数(无限扩展)- 适合处理不常发生的事件。
  • 云 运行 - 类似于云函数,但您可以同时处理多个事件 - 适用于同时爆发的事件(在这种情况下,它比云函数便宜)
  • 从 PubSub 读取数据流 -> 处理频繁/稳定的数据批次 (fils) 或数据流 (PubSub/Kafka)。

为每个文件触发数据流作业确实在时间和成本(分钟和美元)方面都是低效的。

如果您需要使用 Dataflow 连续 响应文件通知(完成、删除等),您应该将存储通知发送到 pubsub 主题并从数据流订阅中读取它们。请注意,这仅适用于流式传输。

如果您使用 ReadFromPubsub 读取存储通知:

    with beam.Pipeline(options=pipeline_options) as pipeline:

        pubsub_msgs = pipeline | (
            'Read PubSub Messages' >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=global_vars.input_subscription) ) 

我认为这里最好的方法是使用 templates,因为您不是在更改代码,而是在更改路径。获得模板后,您只需调用 API 即可启动它们。设置起来肯定不会那么麻烦,而且可能更有弹性,因为您不会那么依赖 Cloud Functions。

我认为还有另一种方法会更好,它不需要 Cloud Functions。您可以使用 Java. If you need / want Python, there's no counterpart for it yet, but I took the liberty to create a version of it that does the same thing and send a Pull Request 中的 MatchAll.continuously 之类的东西来进行新的 Ptransform。

想法是每隔 X 秒,您检查新文件并根据您的管道处理它们。

如果您不想合并 Pull Request(如果是这样),您可以只复制 DoFn:

class MatchContinuously(beam.PTransform):
  def __init__(
      self,
      file_pattern,
      interval=360.0,
      has_deduplication=True,
      start_timestamp=Timestamp.now(),
      stop_timestamp=MAX_TIMESTAMP):

    self.file_pattern = file_pattern
    self.interval = interval
    self.has_deduplication = has_deduplication
    self.start_ts = start_timestamp
    self.stop_ts = stop_timestamp

  def expand(self, pcol):
    impulse = pcol | PeriodicImpulse(
        start_timestamp=self.start_ts,
        stop_timestamp=self.stop_ts,
        fire_interval=self.interval)

    match_files = (
        impulse
        | beam.Map(lambda x: self.file_pattern)
        | MatchAll())

    if self.has_deduplication:
      match_files = (
          match_files
          # Making a Key Value so each file has its own state.
          | "To KV" >> beam.Map(lambda x: (x.path, x))
          | "Remove Already Read" >> beam.ParDo(_RemoveDuplicates()))

    return match_files

class _RemoveDuplicates(beam.DoFn):

  FILES_STATE = BagStateSpec('files', StrUtf8Coder())

  def process(self, element, file_state=beam.DoFn.StateParam(FILES_STATE)):
    path = element[0]
    file_metadata = element[1]
    bag_content = [x for x in file_state.read()]

    if not bag_content:
      file_state.add(path)
      _LOGGER.info("Generated entry for file %s", path)
      yield file_metadata
    else:
      _LOGGER.info("File %s was already read", path)

管道示例:

(p | MatchContinuously("gs://apache-beam-samples/shakespeare/*", 180)
   | Map(lambda x: x.path)
   | ReadAllFromText()
   | Map(lambda x: logging.info(x))
)

第三种方法可以继续使用 GCS 通知并使用 PubSub + MatchAll。管道看起来像:

(p | ReadFromPubSub(topic)
   | MatchAll())
)

根据新文件的频率以及是否要使用通知,您可以在这三种方法之间做出选择。