如何在使用 Python API 创建后立即从 Google Vault export 下载文件?
How to download files from Google Vault export immediately after creating it with Python API?
使用 Python API,我创建了一个导出。如何使用相同的授权服务下载导出中的 .zip 文件?创建导出时,我可以看到 cloudStorageSink 的 bucketName 和 objectNames,但是我找不到任何关于如何使用创建导出的现有服务将它们下载到我的主机的文档
#!/usr/bin/env python
from __future__ import print_function
import datetime
import json
import time
from googleapiclient.discovery import build
from httplib2 import Http
from oauth2client import file, client, tools
# If modifying these scopes, delete the file token.json.
SCOPES = 'https://www.googleapis.com/auth/ediscovery'
def list_exports(service, matter_id):
return service.matters().exports().list(matterId=matter_id).execute()
def get_export_by_id(service, matter_id, export_id):
return service.matters().exports().get(matterId=matter_id, exportId=export_id).execute()
def get_service():
'''
Look for an active credential token, if one does not exist, use credentials.json
and ask user for permission to access. Store new token, return the service object
'''
store = file.Storage('token.json')
creds = store.get()
if not creds or creds.invalid:
flow = client.flow_from_clientsecrets('credentials.json', SCOPES)
creds = tools.run_flow(flow, store)
service = build('vault', 'v1', http=creds.authorize(Http()))
return service
def create_drive_export(service, matter_id, export_name, num_days):
"""
once we have a matter_id , we can create an export under it with the relevant files we are looking for.
"""
# set times for beginning and end of query:
today = datetime.datetime.now()
print("creating a drive export at {}".format(today))
start_time = today - datetime.timedelta(days=num_days)
drive_query_options = {'includeTeamDrives': True}
user_list = ['me@gmail.com']
drive_query = {
'corpus': 'DRIVE',
'dataScope': 'ALL_DATA',
'searchMethod': 'ACCOUNT',
'accountInfo': {
'emails': user_list
},
'driveOptions': drive_query_options,
# end time is more recent date, start time is older date
'endTime': '{}-{}-{}T00:00:00Z'.format(today.year, today.month, today.day),
'startTime': '{}-{}-{}T00:00:00Z'.format(start_time.year, start_time.month, start_time.day),
'timeZone': 'Etc/GMT'
}
wanted_export = {
'name': export_name,
'query': drive_query,
'exportOptions': {
'driveOptions': {}
}
}
return service.matters().exports().create(matterId=matter_id, body=wanted_export).execute()
def get_export(service, matter_id, export_id):
return service.matters().exports().get(matterId=matter_id, exportId=export_id).execute()
def main():
service = get_service()
matter_id = '<known_matter_id>'
timestamp = datetime.datetime.now().strftime("%Y%m%d.%H%M%s")
export = create_drive_export(service, matter_id, "code_gen_export.{}".format(timestamp), 1)
# check every 5 seconds until export is done being created:
while export['status'] == 'IN_PROGRESS':
export = get_export(service, matter_id, export['id'])
print('...')
time.sleep(5)
# print(json.dumps(export, indent=2))
print(json.dumps(export['cloudStorageSink']['files'], indent=2))
if __name__ == '__main__':
main()
和 运行 上面的代码产生:
creating a drive export at 2018-09-20 17:12:38.026402
...
...
...
...
...
...
[
{
"md5Hash": "hash_value",
"bucketName": "bucket_string",
"objectName": "object1_string/code_gen_export.20180920.17121537481558-custodian-docid.csv",
"size": "1684"
},
{
"md5Hash": "hash_value",
"bucketName": "bucket_string",
"objectName": "object2_string/code_gen_export.20180920.17121537481558-metadata.xml",
"size": "10600"
},
{
"md5Hash": "hash_value",
"bucketName": "bucket_string",
"objectName": "object3_string/code_gen_export.20180920.17121537481558_0.zip",
"size": "21599222"
}
]
我可以使用我在 get_service() 中创建的服务对象下载 .zip 文件吗?
经过与上述问题的长期斗争,我在 Google 的一位 API 支持人员的帮助下找到了正确的方法。
请注意,您需要使用以下方法创建新服务:
build('storage', 'v1', credentials=credentials)
其中 cradintials 是:
service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=SCOPES,
subject='user@domain.com'
)
(可能与您用于凭据的参数相同:http=creds.authorize(Http())
将正常工作 - 我没有尝试过)
此外,您还需要使用字节流库,例如:io
并导入 googleapiclient.http
。
完整代码:
import io
from google.oauth2 import service_account
from googleapiclient.discovery import build
import googleapiclient.http
SCOPES = ['https://www.googleapis.com/auth/devstorage.full_control']
SERVICE_ACCOUNT_FILE = 'yourServiceAccountFile.json'
bucket_name = 'yourBucketName'
object_name = 'yourObjectName.zip'
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=SCOPES,
subject='user@domain.com'
)
service = build('storage', 'v1', credentials=credentials)
req = service.objects().get_media(bucket=bucket_name, object=object_name)
out_file = io.BytesIO()
downloader = googleapiclient.http.MediaIoBaseDownload(out_file, req)
done = False
while done is False:
status, done = downloader.next_chunk()
print("Download {}%.".format(int(status.progress() * 100)))
file_name = '/Users/myUser/Downloads/new_file.zip'
open(file_name, "w").write(out_file.getvalue())
上面的答案很好,但会导致大文件出现问题,因为 BytesIO 将数据保存在内存中。在低 RAM 环境中,2GB 可以杀死您的下载。建议改用 FileIO。
更改以下代码:
out_file = io.BytesIO()
downloader = googleapiclient.http.MediaIoBaseDownload(out_file, req)
done = False
while done is False:
status, done = downloader.next_chunk()
print("Download {}%.".format(int(status.progress() * 100)))
file_name = '/Users/myUser/Downloads/new_file.zip'
open(file_name, "w").write(out_file.getvalue())
收件人:
file_name = '/myfilepath/myfilename.ext'
with io.FileIO(file_name, mode='wb') as out_file:
downloader = googleapiclient.http.MediaIoBaseDownload(out_file, req)
done = False
while not done:
status, done = downloader.next_chunk()
使用 Python API,我创建了一个导出。如何使用相同的授权服务下载导出中的 .zip 文件?创建导出时,我可以看到 cloudStorageSink 的 bucketName 和 objectNames,但是我找不到任何关于如何使用创建导出的现有服务将它们下载到我的主机的文档
#!/usr/bin/env python
from __future__ import print_function
import datetime
import json
import time
from googleapiclient.discovery import build
from httplib2 import Http
from oauth2client import file, client, tools
# If modifying these scopes, delete the file token.json.
SCOPES = 'https://www.googleapis.com/auth/ediscovery'
def list_exports(service, matter_id):
return service.matters().exports().list(matterId=matter_id).execute()
def get_export_by_id(service, matter_id, export_id):
return service.matters().exports().get(matterId=matter_id, exportId=export_id).execute()
def get_service():
'''
Look for an active credential token, if one does not exist, use credentials.json
and ask user for permission to access. Store new token, return the service object
'''
store = file.Storage('token.json')
creds = store.get()
if not creds or creds.invalid:
flow = client.flow_from_clientsecrets('credentials.json', SCOPES)
creds = tools.run_flow(flow, store)
service = build('vault', 'v1', http=creds.authorize(Http()))
return service
def create_drive_export(service, matter_id, export_name, num_days):
"""
once we have a matter_id , we can create an export under it with the relevant files we are looking for.
"""
# set times for beginning and end of query:
today = datetime.datetime.now()
print("creating a drive export at {}".format(today))
start_time = today - datetime.timedelta(days=num_days)
drive_query_options = {'includeTeamDrives': True}
user_list = ['me@gmail.com']
drive_query = {
'corpus': 'DRIVE',
'dataScope': 'ALL_DATA',
'searchMethod': 'ACCOUNT',
'accountInfo': {
'emails': user_list
},
'driveOptions': drive_query_options,
# end time is more recent date, start time is older date
'endTime': '{}-{}-{}T00:00:00Z'.format(today.year, today.month, today.day),
'startTime': '{}-{}-{}T00:00:00Z'.format(start_time.year, start_time.month, start_time.day),
'timeZone': 'Etc/GMT'
}
wanted_export = {
'name': export_name,
'query': drive_query,
'exportOptions': {
'driveOptions': {}
}
}
return service.matters().exports().create(matterId=matter_id, body=wanted_export).execute()
def get_export(service, matter_id, export_id):
return service.matters().exports().get(matterId=matter_id, exportId=export_id).execute()
def main():
service = get_service()
matter_id = '<known_matter_id>'
timestamp = datetime.datetime.now().strftime("%Y%m%d.%H%M%s")
export = create_drive_export(service, matter_id, "code_gen_export.{}".format(timestamp), 1)
# check every 5 seconds until export is done being created:
while export['status'] == 'IN_PROGRESS':
export = get_export(service, matter_id, export['id'])
print('...')
time.sleep(5)
# print(json.dumps(export, indent=2))
print(json.dumps(export['cloudStorageSink']['files'], indent=2))
if __name__ == '__main__':
main()
和 运行 上面的代码产生:
creating a drive export at 2018-09-20 17:12:38.026402
...
...
...
...
...
...
[
{
"md5Hash": "hash_value",
"bucketName": "bucket_string",
"objectName": "object1_string/code_gen_export.20180920.17121537481558-custodian-docid.csv",
"size": "1684"
},
{
"md5Hash": "hash_value",
"bucketName": "bucket_string",
"objectName": "object2_string/code_gen_export.20180920.17121537481558-metadata.xml",
"size": "10600"
},
{
"md5Hash": "hash_value",
"bucketName": "bucket_string",
"objectName": "object3_string/code_gen_export.20180920.17121537481558_0.zip",
"size": "21599222"
}
]
我可以使用我在 get_service() 中创建的服务对象下载 .zip 文件吗?
经过与上述问题的长期斗争,我在 Google 的一位 API 支持人员的帮助下找到了正确的方法。
请注意,您需要使用以下方法创建新服务:
build('storage', 'v1', credentials=credentials)
其中 cradintials 是:
service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=SCOPES,
subject='user@domain.com'
)
(可能与您用于凭据的参数相同:http=creds.authorize(Http())
将正常工作 - 我没有尝试过)
此外,您还需要使用字节流库,例如:io
并导入 googleapiclient.http
。
完整代码:
import io
from google.oauth2 import service_account
from googleapiclient.discovery import build
import googleapiclient.http
SCOPES = ['https://www.googleapis.com/auth/devstorage.full_control']
SERVICE_ACCOUNT_FILE = 'yourServiceAccountFile.json'
bucket_name = 'yourBucketName'
object_name = 'yourObjectName.zip'
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=SCOPES,
subject='user@domain.com'
)
service = build('storage', 'v1', credentials=credentials)
req = service.objects().get_media(bucket=bucket_name, object=object_name)
out_file = io.BytesIO()
downloader = googleapiclient.http.MediaIoBaseDownload(out_file, req)
done = False
while done is False:
status, done = downloader.next_chunk()
print("Download {}%.".format(int(status.progress() * 100)))
file_name = '/Users/myUser/Downloads/new_file.zip'
open(file_name, "w").write(out_file.getvalue())
上面的答案很好,但会导致大文件出现问题,因为 BytesIO 将数据保存在内存中。在低 RAM 环境中,2GB 可以杀死您的下载。建议改用 FileIO。
更改以下代码:
out_file = io.BytesIO()
downloader = googleapiclient.http.MediaIoBaseDownload(out_file, req)
done = False
while done is False:
status, done = downloader.next_chunk()
print("Download {}%.".format(int(status.progress() * 100)))
file_name = '/Users/myUser/Downloads/new_file.zip'
open(file_name, "w").write(out_file.getvalue())
收件人:
file_name = '/myfilepath/myfilename.ext'
with io.FileIO(file_name, mode='wb') as out_file:
downloader = googleapiclient.http.MediaIoBaseDownload(out_file, req)
done = False
while not done:
status, done = downloader.next_chunk()