将文件下载和上传到 Apache Beam DoFn 中的 GCP 存储桶(Google 数据流)
downloading and uploading file to GCP bucket in Apache Beam DoFn(Google Dataflow)
我正在尝试从 GCP 存储桶下载加密文件和密钥,然后解密文件并将其加载回存储桶。所以我构建了这个 DataFlow 管道,如下所示:
class downloadFile(beam.DoFn):
def __init__(self):
self.bucket_name = 'bucket_name'
self.source_blob_name = 'test.csv.gpg'
self.destination_file_name = "/tmp/test.csv.gpg"
def process(self, element):
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket(self.bucket_name)
blob = bucket.blob(self.source_blob_name)
blob.download_to_filename(self.destination_file_name)
这里我使用了self.destination_file_name = "/tmp/test.csv.gpg"
,因为我从其他人那里了解到DataFlow作业将在Linux VM上运行,所以将文件下载到这个/tmp/路径是完全安全的.
class downloadKey(beam.DoFn):
def __init__(self):
self.bucket_name = 'bucket_name'
self.source_blob_name = 'privateKey.txt'
self.destination_file_name = "/tmp/privateKey.txt"
def process(self, element):
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket(self.bucket_name)
blob = bucket.blob(self.source_blob_name)
blob.download_to_filename(self.destination_file_name)
基本上,两个下载DoFns 具有相同的结构。下载文件和密钥后,密钥将导入 DataFlow 运行ning VM:
class importKey(beam.DoFn):
def process(self, element):
import subprocess
subprocess.call(['gpg', '--import','/tmp/privateKey.txt'])
然后解密DoFn:
class decryption(beam.DoFn):
def process(self, element, *args, **kwargs):
import subprocess
subprocess.call(['gpg', '-d', '/tmp/test.csv.gpg > test.csv'])
# load file back to bucket
bucket_name = 'bucket_name'
source_file_name = '/tmp/test.csv'
destination_blob_name = "clearText.csv"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
因此,此解密 DoFn 将调用带有子进程的 gpg 命令来解密文件。
最后是流水线本身:
dummyMessage = {"projectID":"fakeProjectID",
"bucketID":"fakeBucketID"}
setp= (
p
| 'Create Sample'
>> beam.Create([dummyMessage["projectID"]])
|"testDecrypt" >> beam.ParDo(downloadLookupFile())
|"testDecrypt2" >> beam.ParDo(downloadKey())
|"testDecrypt3" >> beam.ParDo(importKey())
|"testDecrypt4" >> beam.ParDo(decryption())
)
这里我只是创建一个虚拟消息来调用管道,稍后将替换为真实消息。
当我 运行 管道时,它看起来一切正常,我可以看到作业已在 DataFlow 中创建,并且显示作业状态为成功。但是在存储桶中我看不到解密的文件。
我在代码中添加了几个打印语句进行调试,在downloadFile()和downloadKey()方法中似乎没有到达process(),这意味着没有文件被处理过。谁能分享一些关于如何在 DoFn 中访问 GCS 存储桶的知识?我不确定代码的哪一部分有问题,我觉得都不错。
任何帮助将不胜感激。
欢迎 Alex 使用 Whosebug。
首先,关于日志(打印语句),如果您没有看到它们,可能是因为您看错了地方。事实上,如果您将它们放在 DoFns 的 process
内(如 decryption
class),您需要查看 WORKER LOGS
而不是 JOB LOGS
也不是你的终端。在下面的屏幕截图中,我展示了如何访问工作日志。作业日志或驱动程序日志是显示您在管道创建级别添加的打印/日志的日志(beam.Create
...)如果您有 运行 作业,您可以在终端中看到它们来自它。
那么,恕我直言,数据流不是适合这种需求的处理平台。它用于并行分布式处理大文件或大数据块(比方说> 2GB)。这意味着在幕后,您有一个部分在一个工作节点(幕后的 GCE VM 实例)上处理,而另一部分文件在另一个工作节点上处理。在你的情况下,如果你有超过 1 个工作人员,你可能会在一个节点下载加密文件,在另一个节点下载密钥,在第三个节点下载解密。所以使用 /tmp
会过时。
最后一个变通解决方案是使用例如 Cloud Function (CF),它将 运行 以单线程方式进行,并允许您重用内部代码不同的 process
方法:
- CF 由将加密文件上传到 GCS 存储桶触发。这里有一些文档如何设置这样的触发器:https://cloud.google.com/functions/docs/calling/storage(python 中有示例)
- 您的 CF 代码将下载加密密钥、解密并将解密后的文件上传回另一个 GCS 存储桶。对于云功能,您可以将内存设置为 8GB,并使用
/tmp
,它使用后台内存。
顺便说一下,安全方面,我认为将加密密钥存储在 GCS 中不是一个好的做法,尝试看看 https://cloud.google.com/secret-manager
我正在尝试从 GCP 存储桶下载加密文件和密钥,然后解密文件并将其加载回存储桶。所以我构建了这个 DataFlow 管道,如下所示:
class downloadFile(beam.DoFn):
def __init__(self):
self.bucket_name = 'bucket_name'
self.source_blob_name = 'test.csv.gpg'
self.destination_file_name = "/tmp/test.csv.gpg"
def process(self, element):
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket(self.bucket_name)
blob = bucket.blob(self.source_blob_name)
blob.download_to_filename(self.destination_file_name)
这里我使用了self.destination_file_name = "/tmp/test.csv.gpg"
,因为我从其他人那里了解到DataFlow作业将在Linux VM上运行,所以将文件下载到这个/tmp/路径是完全安全的.
class downloadKey(beam.DoFn):
def __init__(self):
self.bucket_name = 'bucket_name'
self.source_blob_name = 'privateKey.txt'
self.destination_file_name = "/tmp/privateKey.txt"
def process(self, element):
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket(self.bucket_name)
blob = bucket.blob(self.source_blob_name)
blob.download_to_filename(self.destination_file_name)
基本上,两个下载DoFns 具有相同的结构。下载文件和密钥后,密钥将导入 DataFlow 运行ning VM:
class importKey(beam.DoFn):
def process(self, element):
import subprocess
subprocess.call(['gpg', '--import','/tmp/privateKey.txt'])
然后解密DoFn:
class decryption(beam.DoFn):
def process(self, element, *args, **kwargs):
import subprocess
subprocess.call(['gpg', '-d', '/tmp/test.csv.gpg > test.csv'])
# load file back to bucket
bucket_name = 'bucket_name'
source_file_name = '/tmp/test.csv'
destination_blob_name = "clearText.csv"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
因此,此解密 DoFn 将调用带有子进程的 gpg 命令来解密文件。
最后是流水线本身:
dummyMessage = {"projectID":"fakeProjectID",
"bucketID":"fakeBucketID"}
setp= (
p
| 'Create Sample'
>> beam.Create([dummyMessage["projectID"]])
|"testDecrypt" >> beam.ParDo(downloadLookupFile())
|"testDecrypt2" >> beam.ParDo(downloadKey())
|"testDecrypt3" >> beam.ParDo(importKey())
|"testDecrypt4" >> beam.ParDo(decryption())
)
这里我只是创建一个虚拟消息来调用管道,稍后将替换为真实消息。
当我 运行 管道时,它看起来一切正常,我可以看到作业已在 DataFlow 中创建,并且显示作业状态为成功。但是在存储桶中我看不到解密的文件。
我在代码中添加了几个打印语句进行调试,在downloadFile()和downloadKey()方法中似乎没有到达process(),这意味着没有文件被处理过。谁能分享一些关于如何在 DoFn 中访问 GCS 存储桶的知识?我不确定代码的哪一部分有问题,我觉得都不错。
任何帮助将不胜感激。
欢迎 Alex 使用 Whosebug。
首先,关于日志(打印语句),如果您没有看到它们,可能是因为您看错了地方。事实上,如果您将它们放在 DoFns 的
process
内(如decryption
class),您需要查看WORKER LOGS
而不是JOB LOGS
也不是你的终端。在下面的屏幕截图中,我展示了如何访问工作日志。作业日志或驱动程序日志是显示您在管道创建级别添加的打印/日志的日志(beam.Create
...)如果您有 运行 作业,您可以在终端中看到它们来自它。那么,恕我直言,数据流不是适合这种需求的处理平台。它用于并行分布式处理大文件或大数据块(比方说> 2GB)。这意味着在幕后,您有一个部分在一个工作节点(幕后的 GCE VM 实例)上处理,而另一部分文件在另一个工作节点上处理。在你的情况下,如果你有超过 1 个工作人员,你可能会在一个节点下载加密文件,在另一个节点下载密钥,在第三个节点下载解密。所以使用
/tmp
会过时。最后一个变通解决方案是使用例如 Cloud Function (CF),它将 运行 以单线程方式进行,并允许您重用内部代码不同的
process
方法:
- CF 由将加密文件上传到 GCS 存储桶触发。这里有一些文档如何设置这样的触发器:https://cloud.google.com/functions/docs/calling/storage(python 中有示例)
- 您的 CF 代码将下载加密密钥、解密并将解密后的文件上传回另一个 GCS 存储桶。对于云功能,您可以将内存设置为 8GB,并使用
/tmp
,它使用后台内存。
顺便说一下,安全方面,我认为将加密密钥存储在 GCS 中不是一个好的做法,尝试看看 https://cloud.google.com/secret-manager