如何从数据流工作者签署 gcs blob
How to sign gcs blob from the dataflow worker
我的 Beam 数据流作业在本地成功(DirectRunner
)并在云端失败(DataflowRunner
)
此代码段中定位的问题:
class SomeDoFn(beam.DoFn):
...
def process(self, gcs_blob_path):
gcs_client = storage.Client()
bucket = gcs_client.get_bucket(BUCKET_NAME)
blob = Blob(gcs_blob_path, bucket)
# NEXT LINE IS CAUSING ISSUES! (when run remotely)
url = blob.generate_signed_url(datetime.timedelta(seconds=300), method='GET')
并且数据流指向错误:"AttributeError: you need a private key to sign credentials.the credentials you are currently using just contains a token."
我的数据流作业使用服务帐户(PipelineOptions
中提供了适当的 service_account_email
),但是我不知道如何传递 .json 凭据文件该服务帐户到数据流作业。我怀疑我的工作在本地成功运行是因为我设置了环境变量 GOOGLE_APPLICATION_CREDENTIALS=<path to local file with service account credentials>
,但是我如何为远程数据流工作人员类似地设置它?或者也许还有另一种解决方案,如果有人可以提供帮助
您将需要提供服务帐户 JSON 密钥,这与您在本地使用环境变量 GOOGLE_APPLICATION_CREDENTIALS 所做的类似。
为此,您可以按照此 . Such as passing it using PipelineOptions
的答案中提到的一些方法进行操作
但是,请记住,最安全的方法是将 JSON 密钥存储在 GCP 存储桶中,然后从那里获取文件。
简单但不安全的解决方法是获取密钥,打开它,然后在您的代码中创建一个基于它的 json 对象以供稍后传递。
您可以查看有关如何向 Beam 管道添加自定义选项的示例 here。有了这个,我们可以创建一个 --key_file
参数,它将指向存储在 GCS 中的凭据:
parser.add_argument('--key_file',
dest='key_file',
required=True,
help='Path to service account credentials JSON.')
这将允许您在 运行 作业时添加 --key_file gs://PATH/TO/CREDENTIALS.json
标志。
然后,您可以从作业中读取它并将其作为辅助输入传递给需要签署 blob 的 DoFn
。从示例 开始,我们创建一个 credentials
PCollection 来保存 JSON 文件:
credentials = (p
| 'Read Credentials from GCS' >> ReadFromText(known_args.key_file))
然后我们将它广播给所有处理 SignFileFn
函数的工作人员:
(p
| 'Read File from GCS' >> beam.Create([known_args.input]) \
| 'Sign File' >> beam.ParDo(SignFileFn(), pvalue.AsList(credentials)))
在 ParDo
内部,我们构建 JSON 对象来初始化客户端(使用方法 here)并签署文件:
class SignFileFn(beam.DoFn):
"""Signs GCS file with GCS-stored credentials"""
def process(self, gcs_blob_path, creds):
from google.cloud import storage
from google.oauth2 import service_account
credentials_json=json.loads('\n'.join(creds))
credentials = service_account.Credentials.from_service_account_info(credentials_json)
gcs_client = storage.Client(credentials=credentials)
bucket = gcs_client.get_bucket(gcs_blob_path.split('/')[2])
blob = bucket.blob('/'.join(gcs_blob_path.split('/')[3:]))
url = blob.generate_signed_url(datetime.timedelta(seconds=300), method='GET')
logging.info(url)
yield url
查看完整代码here
我的 Beam 数据流作业在本地成功(DirectRunner
)并在云端失败(DataflowRunner
)
此代码段中定位的问题:
class SomeDoFn(beam.DoFn):
...
def process(self, gcs_blob_path):
gcs_client = storage.Client()
bucket = gcs_client.get_bucket(BUCKET_NAME)
blob = Blob(gcs_blob_path, bucket)
# NEXT LINE IS CAUSING ISSUES! (when run remotely)
url = blob.generate_signed_url(datetime.timedelta(seconds=300), method='GET')
并且数据流指向错误:"AttributeError: you need a private key to sign credentials.the credentials you are currently using just contains a token."
我的数据流作业使用服务帐户(PipelineOptions
中提供了适当的 service_account_email
),但是我不知道如何传递 .json 凭据文件该服务帐户到数据流作业。我怀疑我的工作在本地成功运行是因为我设置了环境变量 GOOGLE_APPLICATION_CREDENTIALS=<path to local file with service account credentials>
,但是我如何为远程数据流工作人员类似地设置它?或者也许还有另一种解决方案,如果有人可以提供帮助
您将需要提供服务帐户 JSON 密钥,这与您在本地使用环境变量 GOOGLE_APPLICATION_CREDENTIALS 所做的类似。
为此,您可以按照此
但是,请记住,最安全的方法是将 JSON 密钥存储在 GCP 存储桶中,然后从那里获取文件。
简单但不安全的解决方法是获取密钥,打开它,然后在您的代码中创建一个基于它的 json 对象以供稍后传递。
您可以查看有关如何向 Beam 管道添加自定义选项的示例 here。有了这个,我们可以创建一个 --key_file
参数,它将指向存储在 GCS 中的凭据:
parser.add_argument('--key_file',
dest='key_file',
required=True,
help='Path to service account credentials JSON.')
这将允许您在 运行 作业时添加 --key_file gs://PATH/TO/CREDENTIALS.json
标志。
然后,您可以从作业中读取它并将其作为辅助输入传递给需要签署 blob 的 DoFn
。从示例 credentials
PCollection 来保存 JSON 文件:
credentials = (p
| 'Read Credentials from GCS' >> ReadFromText(known_args.key_file))
然后我们将它广播给所有处理 SignFileFn
函数的工作人员:
(p
| 'Read File from GCS' >> beam.Create([known_args.input]) \
| 'Sign File' >> beam.ParDo(SignFileFn(), pvalue.AsList(credentials)))
在 ParDo
内部,我们构建 JSON 对象来初始化客户端(使用方法 here)并签署文件:
class SignFileFn(beam.DoFn):
"""Signs GCS file with GCS-stored credentials"""
def process(self, gcs_blob_path, creds):
from google.cloud import storage
from google.oauth2 import service_account
credentials_json=json.loads('\n'.join(creds))
credentials = service_account.Credentials.from_service_account_info(credentials_json)
gcs_client = storage.Client(credentials=credentials)
bucket = gcs_client.get_bucket(gcs_blob_path.split('/')[2])
blob = bucket.blob('/'.join(gcs_blob_path.split('/')[3:]))
url = blob.generate_signed_url(datetime.timedelta(seconds=300), method='GET')
logging.info(url)
yield url
查看完整代码here