在存储帐户 python 之间移动或复制文件 (blob)(天蓝色功能)?
move or copy files(blob) between storage accounts python (azure function)?
我想使用 python(在 azure 函数中)在两个存储帐户之间移动(或复制然后删除)files/blobs。我用过像 这样的方法。
然而,这适用于旧的 SDK,有人知道新 SDK 的方法吗?
类似于 但在两个存储帐户之间而不是在容器之间。
如果要跨Azure存储账户复制blob,请参考以下代码
from azure.storage.blob import ResourceTypes, AccountSasPermissions, generate_account_sas, BlobServiceClient
from datetime import datetime, timedelta
source_key = ''
des_key = ''
source_account_name = ''
des_account_name = '23storage23'
# genearte account sas token for source account
sas_token = generate_account_sas(account_name=source_account_name, account_key=source_key,
resource_types=ResourceTypes(
service=True, container=True, object=True),
permission=AccountSasPermissions(read=True),
expiry=datetime.utcnow() + timedelta(hours=1))
source_blob_service_client = BlobServiceClient(
account_url=f'https://{source_account_name}.blob.core.windows.net/', credential=source_key)
des_blob_service_client = BlobServiceClient(
account_url=f'https://{des_account_name}.blob.core.windows.net/', credential=des_key)
source_container_client = source_blob_service_client.get_container_client(
'copy')
source_blob = source_container_client.get_blob_client('Capture.PNG')
source_url = source_blob.url+'?'+sas_token
# copy
des_blob_service_client.get_blob_client(
'test', source_blob.blob_name).start_copy_from_url(source_url)
此外,如果源容器的访问级别是public,我们可以简化代码如下
from azure.storage.blob import BlobServiceClient
source_key = ''
des_key = ''
source_account_name = ''
des_account_name = '23storage23'
source_blob_service_client = BlobServiceClient(
account_url=f'https://{source_account_name}.blob.core.windows.net/', credential=source_key)
des_blob_service_client = BlobServiceClient(
account_url=f'https://{des_account_name}.blob.core.windows.net/', credential=des_key)
source_container_client = source_blob_service_client.get_container_client(
'input')
source_blob = source_container_client.get_blob_client('file.json')
source_url = source_blob.url
# copy
des_blob_service_client.get_blob_client(
'test', source_blob.blob_name).start_copy_from_url(source_url)
详情请参考here
我看这个回答也不错,就放在这里吧。
来自here
import json
import logging
import os
import azure.functions as func
from azure.storage.blob import BlobServiceClient, generate_blob_sas, AccessPolicy, BlobSasPermissions
from azure.core.exceptions import ResourceExistsError
from datetime import datetime, timedelta
def main(event: func.EventGridEvent):
result = json.dumps({
'id': event.id,
'data': event.get_json(),
'topic': event.topic,
'subject': event.subject,
'event_type': event.event_type,
})
logging.info('Python EventGrid trigger processed an event: %s', result)
blob_service_client = BlobServiceClient.from_connection_string(
os.environ.get('ARCHIVE_STORAGE_CONNECTION_STRING'))
# Get the URL and extract the name of the file and container
blob_url = event.get_json().get('url')
logging.info('blob URL: %s', blob_url)
blob_name = blob_url.split("/")[-1].split("?")[0]
container_name = blob_url.split("/")[-2].split("?")[0]
archived_container_name = container_name + '-' + os.environ.get('AZURE_STORAGE_ARCHIVE_CONTAINER')
blob_service_client_origin = BlobServiceClient.from_connection_string(os.environ.get('ORIGIN_STORAGE_CONNECTION_STRING'))
blob_to_copy = blob_service_client_origin.get_blob_client(container=container_name, blob=blob_name)
sas_token = generate_blob_sas(
blob_to_copy.account_name,
blob_to_copy.container_name,
blob_to_copy.blob_name,
account_key=blob_service_client_origin.credential.account_key,
permission=BlobSasPermissions(read=True),
start=datetime.utcnow() + timedelta(seconds=1),
expiry=datetime.utcnow() + timedelta(hours=1))
logging.info('sas token: %s',sas_token)
archived_container = blob_service_client.get_container_client(archived_container_name)
# Create new Container
try:
archived_container.create_container()
except ResourceExistsError:
pass
copied_blob = blob_service_client.get_blob_client(
archived_container_name, blob_name)
blob_to_copy_url = blob_url + '?' + sas_token
logging.info('blob url: ' + blob_to_copy_url)
# Start copy
copied_blob.start_copy_from_url(blob_to_copy_url)
我想使用 python(在 azure 函数中)在两个存储帐户之间移动(或复制然后删除)files/blobs。我用过像
类似于
如果要跨Azure存储账户复制blob,请参考以下代码
from azure.storage.blob import ResourceTypes, AccountSasPermissions, generate_account_sas, BlobServiceClient
from datetime import datetime, timedelta
source_key = ''
des_key = ''
source_account_name = ''
des_account_name = '23storage23'
# genearte account sas token for source account
sas_token = generate_account_sas(account_name=source_account_name, account_key=source_key,
resource_types=ResourceTypes(
service=True, container=True, object=True),
permission=AccountSasPermissions(read=True),
expiry=datetime.utcnow() + timedelta(hours=1))
source_blob_service_client = BlobServiceClient(
account_url=f'https://{source_account_name}.blob.core.windows.net/', credential=source_key)
des_blob_service_client = BlobServiceClient(
account_url=f'https://{des_account_name}.blob.core.windows.net/', credential=des_key)
source_container_client = source_blob_service_client.get_container_client(
'copy')
source_blob = source_container_client.get_blob_client('Capture.PNG')
source_url = source_blob.url+'?'+sas_token
# copy
des_blob_service_client.get_blob_client(
'test', source_blob.blob_name).start_copy_from_url(source_url)
此外,如果源容器的访问级别是public,我们可以简化代码如下
from azure.storage.blob import BlobServiceClient
source_key = ''
des_key = ''
source_account_name = ''
des_account_name = '23storage23'
source_blob_service_client = BlobServiceClient(
account_url=f'https://{source_account_name}.blob.core.windows.net/', credential=source_key)
des_blob_service_client = BlobServiceClient(
account_url=f'https://{des_account_name}.blob.core.windows.net/', credential=des_key)
source_container_client = source_blob_service_client.get_container_client(
'input')
source_blob = source_container_client.get_blob_client('file.json')
source_url = source_blob.url
# copy
des_blob_service_client.get_blob_client(
'test', source_blob.blob_name).start_copy_from_url(source_url)
我看这个回答也不错,就放在这里吧。 来自here
import json
import logging
import os
import azure.functions as func
from azure.storage.blob import BlobServiceClient, generate_blob_sas, AccessPolicy, BlobSasPermissions
from azure.core.exceptions import ResourceExistsError
from datetime import datetime, timedelta
def main(event: func.EventGridEvent):
result = json.dumps({
'id': event.id,
'data': event.get_json(),
'topic': event.topic,
'subject': event.subject,
'event_type': event.event_type,
})
logging.info('Python EventGrid trigger processed an event: %s', result)
blob_service_client = BlobServiceClient.from_connection_string(
os.environ.get('ARCHIVE_STORAGE_CONNECTION_STRING'))
# Get the URL and extract the name of the file and container
blob_url = event.get_json().get('url')
logging.info('blob URL: %s', blob_url)
blob_name = blob_url.split("/")[-1].split("?")[0]
container_name = blob_url.split("/")[-2].split("?")[0]
archived_container_name = container_name + '-' + os.environ.get('AZURE_STORAGE_ARCHIVE_CONTAINER')
blob_service_client_origin = BlobServiceClient.from_connection_string(os.environ.get('ORIGIN_STORAGE_CONNECTION_STRING'))
blob_to_copy = blob_service_client_origin.get_blob_client(container=container_name, blob=blob_name)
sas_token = generate_blob_sas(
blob_to_copy.account_name,
blob_to_copy.container_name,
blob_to_copy.blob_name,
account_key=blob_service_client_origin.credential.account_key,
permission=BlobSasPermissions(read=True),
start=datetime.utcnow() + timedelta(seconds=1),
expiry=datetime.utcnow() + timedelta(hours=1))
logging.info('sas token: %s',sas_token)
archived_container = blob_service_client.get_container_client(archived_container_name)
# Create new Container
try:
archived_container.create_container()
except ResourceExistsError:
pass
copied_blob = blob_service_client.get_blob_client(
archived_container_name, blob_name)
blob_to_copy_url = blob_url + '?' + sas_token
logging.info('blob url: ' + blob_to_copy_url)
# Start copy
copied_blob.start_copy_from_url(blob_to_copy_url)