尝试将 CSV 文件从 Dataflow 管道写入 Google Cloud Storage 时出错

Error trying to write CSV file to Google Cloud Storage from Dataflow pipeline

我正在构建一个 Dataflow 管道,它从我的 Cloud Storage 存储桶中读取一个 CSV 文件(包含 250,000 行),修改每行的值,然后将修改后的内容写入同一个存储桶中的新 CSV .使用下面的代码,我能够读取和修改原始文件的内容,但是当我尝试在 GCS 中写入新文件的内容时,出现以下错误:

google.api_core.exceptions.TooManyRequests: 429 POST https://storage.googleapis.com/upload/storage/v1/b/my-bucket/o?uploadType=multipart: {
  "error": {
    "code": 429,
    "message": "The rate of change requests to the object my-bucket/product-codes/URL_test_codes.csv exceeds the rate limit. Please reduce the rate of create, update, and delete requests.",
    "errors": [
      {
        "message": "The rate of change requests to the object my-bucket/product-codes/URL_test_codes.csv exceeds the rate limit. Please reduce the rate of create, update, and delete requests.",
        "domain": "usageLimits",
        "reason": "rateLimitExceeded"
      }
    ]
  }
}
: ('Request failed with status code', 429, 'Expected one of', <HTTPStatus.OK: 200>) [while running 'Store Output File']

我在 Dataflow 中的代码:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import traceback
import sys
import pandas as pd
from cryptography.fernet import Fernet
import google.auth
from google.cloud import storage

fernet_secret = 'aD4t9MlsHLdHyuFKhoyhy9_eLKDfe8eyVSD3tu8KzoP='
bucket = 'my-bucket'
inputFile = f'gs://{bucket}/product-codes/test_codes.csv'
outputFile = 'product-codes/URL_test_codes.csv'

#Pipeline Logic
def product_codes_pipeline(project, env, region='us-central1'):
    options = PipelineOptions(
        streaming=False,
        project=project,
        region=region,
        staging_location="gs://my-bucket-dataflows/Templates/staging",
        temp_location="gs://my-bucket-dataflows/Templates/temp",
        template_location="gs://my-bucket-dataflows/Templates/Generate_Product_Codes.py",
        subnetwork='https://www.googleapis.com/compute/v1/projects/{}/regions/us-central1/subnetworks/{}-private'.format(project, env)
    )
    
    # Transform function
    def genURLs(code):
        f = Fernet(fernet_secret)
        encoded = code.encode()
        encrypted = f.encrypt(encoded)
        decrypted = f.decrypt(encrypted.decode().encode())
        decoded = decrypted.decode()
        if code != decoded:
            print(f'Error: Code {code} and decoded code {decoded} do not match')
            sys.exit(1)
        url = 'https://some-url.com/redeem/product-code=' + encrypted.decode()
        return url
    
    class WriteCSVFIle(beam.DoFn):
        def __init__(self, bucket_name):
            self.bucket_name = bucket_name

        def start_bundle(self):
            self.client = storage.Client()

        def process(self, urls):
            df = pd.DataFrame([urls], columns=['URL'])

            bucket = self.client.get_bucket(self.bucket_name)
            bucket.blob(f'{outputFile}').upload_from_string(df.to_csv(index=False), 'text/csv')
    
    
    # End function
    p = beam.Pipeline(options=options)
    (p | 'Read Input CSV' >> beam.io.ReadFromText(inputFile, skip_header_lines=1)
       | 'Map Codes' >> beam.Map(genURLs)
       | 'Store Output File' >> beam.ParDo(WriteCSVFIle(bucket)))

    p.run()

代码在我的存储桶中生成 URL_test_codes.csv,但文件仅包含一行(不包括 'URL' header),这告诉我我的代码是 writing/overwriting 文件处理每一行。有没有办法批量写入整个文件的内容,而不是发出一系列更新文件的请求?我是 Python/Dataflow 的新手,非常感谢任何帮助。

让我们指出问题:明显的问题是 GCS 方面的配额问题,由“429”错误代码反映出来。但正如您所指出的,这是由固有问题引起的,它与您尝试将数据写入 blob 的方式更为相关。

由于 Beam Pipeline 生成元素的并行集合,当您将元素添加到 PCollection 时,将为每个元素执行每个管道步骤,换句话说,您的 ParDo 函数将尝试将一些内容写入您的输出文件PCollection 中的每个元素一次。

因此,您的 WriteCSVFIle 函数存在一些问题。例如,为了将您的 PCollection 写入 GCS,最好使用一个单独的管道任务专注于写入整个 PCollection,例如:

首先,您可以导入已包含在 Apache Beam 中的此函数:

from apache_beam.io import WriteToText

然后,您在管道的末尾使用它:

| 'Write PCollection to Bucket' >> WriteToText('gs://{0}/{1}'.format(bucket_name, outputFile))

使用此选项,您无需创建存储客户端或引用 blob,该函数只需要接收写入最终结果的 GCS URI,您可以根据您的参数进行调整可以在documentation.

中找到

有了这个,您只需要解决在 WriteCSVFIle 函数中创建的 Dataframe。每个管道步骤都会创建一个新的 PCollection,因此如果 Dataframe-creator 函数应该从 URL 的 PCollection 接收一个元素,那么根据您当前的逻辑,Dataframe 函数产生的新 PCollection 元素每个 url 将有 1 个数据帧,但由于考虑到 'URL' 是数据框中唯一的列,您似乎只想从 genURLs 中写入结果,也许直接从 genURLs 到 WriteToText 可以输出您要查找的内容。

无论哪种方式,您都可以相应地调整您的管道,但至少使用 WriteToText 转换它会负责将您的整个最终 PCollection 写入您的 Cloud Storage 存储桶。