如何从数据流工作者签署 gcs blob

How to sign gcs blob from the dataflow worker

我的 Beam 数据流作业在本地成功(DirectRunner)并在云端失败(DataflowRunner

此代码段中定位的问题:

class SomeDoFn(beam.DoFn):
  ...
  def process(self, gcs_blob_path):
    gcs_client = storage.Client()
    bucket = gcs_client.get_bucket(BUCKET_NAME)
    blob = Blob(gcs_blob_path, bucket)

    # NEXT LINE IS CAUSING ISSUES! (when run remotely)
    url = blob.generate_signed_url(datetime.timedelta(seconds=300), method='GET')

并且数据流指向错误:"AttributeError: you need a private key to sign credentials.the credentials you are currently using just contains a token."

我的数据流作业使用服务帐户(PipelineOptions 中提供了适当的 service_account_email),但是我不知道如何传递 .json 凭据文件该服务帐户到数据流作业。我怀疑我的工作在本地成功运行是因为我设置了环境变量 GOOGLE_APPLICATION_CREDENTIALS=<path to local file with service account credentials>,但是我如何为远程数据流工作人员类似地设置它?或者也许还有另一种解决方案,如果有人可以提供帮助

您将需要提供服务帐户 JSON 密钥,这与您在本地使用环境变量 GOOGLE_APPLICATION_CREDENTIALS 所做的类似。

为此,您可以按照此 . Such as passing it using PipelineOptions

的答案中提到的一些方法进行操作

但是,请记住,最安全的方法是将 JSON 密钥存储在 GCP 存储桶中,然后从那里获取文件。

简单但不安全的解决方法是获取密钥,打开它,然后在您的代码中创建一个基于它的 json 对象以供稍后传递。

您可以查看有关如何向 Beam 管道添加自定义选项的示例 here。有了这个,我们可以创建一个 --key_file 参数,它将指向存储在 GCS 中的凭据:

parser.add_argument('--key_file',
                  dest='key_file',
                  required=True,
                  help='Path to service account credentials JSON.')

这将允许您在 运行 作业时添加 --key_file gs://PATH/TO/CREDENTIALS.json 标志。

然后,您可以从作业中读取它并将其作为辅助输入传递给需要签署 blob 的 DoFn。从示例 开始,我们创建一个 credentials PCollection 来保存 JSON 文件:

credentials = (p 
  | 'Read Credentials from GCS' >> ReadFromText(known_args.key_file))

然后我们将它广播给所有处理 SignFileFn 函数的工作人员:

(p
  | 'Read File from GCS' >> beam.Create([known_args.input]) \
  | 'Sign File' >> beam.ParDo(SignFileFn(), pvalue.AsList(credentials)))

ParDo 内部,我们构建 JSON 对象来初始化客户端(使用方法 here)并签署文件:

class SignFileFn(beam.DoFn):
  """Signs GCS file with GCS-stored credentials"""
  def process(self, gcs_blob_path, creds):
    from google.cloud import storage
    from google.oauth2 import service_account

    credentials_json=json.loads('\n'.join(creds))
    credentials = service_account.Credentials.from_service_account_info(credentials_json)

    gcs_client = storage.Client(credentials=credentials)

    bucket = gcs_client.get_bucket(gcs_blob_path.split('/')[2])
    blob = bucket.blob('/'.join(gcs_blob_path.split('/')[3:]))

    url = blob.generate_signed_url(datetime.timedelta(seconds=300), method='GET')
    logging.info(url)
    yield url

查看完整代码here