在存储帐户 python 之间移动或复制文件 (blob)(天蓝色功能)?

move or copy files(blob) between storage accounts python (azure function)?

我想使用 python(在 azure 函数中)在两个存储帐户之间移动(或复制然后删除)files/blobs。我用过像 这样的方法。 然而,这适用于旧的 SDK,有人知道新 SDK 的方法吗?

类似于 但在两个存储帐户之间而不是在容器之间。

如果要跨Azure存储账户复制blob,请参考以下代码

from azure.storage.blob import ResourceTypes, AccountSasPermissions, generate_account_sas, BlobServiceClient
from datetime import datetime, timedelta
source_key = ''
des_key = ''
source_account_name = ''
des_account_name = '23storage23'
# genearte account sas token for source account
sas_token = generate_account_sas(account_name=source_account_name, account_key=source_key,
                                 resource_types=ResourceTypes(
                                     service=True, container=True, object=True),
                                 permission=AccountSasPermissions(read=True),
                                 expiry=datetime.utcnow() + timedelta(hours=1))
source_blob_service_client = BlobServiceClient(
    account_url=f'https://{source_account_name}.blob.core.windows.net/', credential=source_key)
des_blob_service_client = BlobServiceClient(
    account_url=f'https://{des_account_name}.blob.core.windows.net/', credential=des_key)

source_container_client = source_blob_service_client.get_container_client(
    'copy')

source_blob = source_container_client.get_blob_client('Capture.PNG')
source_url = source_blob.url+'?'+sas_token
# copy
des_blob_service_client.get_blob_client(
    'test', source_blob.blob_name).start_copy_from_url(source_url)

此外,如果源容器的访问级别是public,我们可以简化代码如下

from azure.storage.blob import BlobServiceClient
source_key = ''
des_key = ''
source_account_name = ''
des_account_name = '23storage23'
source_blob_service_client = BlobServiceClient(
    account_url=f'https://{source_account_name}.blob.core.windows.net/', credential=source_key)
des_blob_service_client = BlobServiceClient(
    account_url=f'https://{des_account_name}.blob.core.windows.net/', credential=des_key)

source_container_client = source_blob_service_client.get_container_client(
    'input')

source_blob = source_container_client.get_blob_client('file.json')
source_url = source_blob.url
# copy
des_blob_service_client.get_blob_client(
    'test', source_blob.blob_name).start_copy_from_url(source_url)

详情请参考here

我看这个回答也不错,就放在这里吧。 来自here

import json
import logging
import os

import azure.functions as func
from azure.storage.blob import BlobServiceClient, generate_blob_sas, AccessPolicy, BlobSasPermissions
from azure.core.exceptions import ResourceExistsError
from datetime import datetime, timedelta

def main(event: func.EventGridEvent):
    result = json.dumps({
        'id': event.id,
        'data': event.get_json(),
        'topic': event.topic,
        'subject': event.subject,
        'event_type': event.event_type,
    })

    logging.info('Python EventGrid trigger processed an event: %s', result)

    blob_service_client = BlobServiceClient.from_connection_string(
        os.environ.get('ARCHIVE_STORAGE_CONNECTION_STRING'))

    # Get the URL and extract the name of the file and container
    blob_url = event.get_json().get('url')
    logging.info('blob URL: %s', blob_url)
    blob_name = blob_url.split("/")[-1].split("?")[0]
    container_name = blob_url.split("/")[-2].split("?")[0]
    archived_container_name = container_name + '-' + os.environ.get('AZURE_STORAGE_ARCHIVE_CONTAINER')

    blob_service_client_origin = BlobServiceClient.from_connection_string(os.environ.get('ORIGIN_STORAGE_CONNECTION_STRING'))

    blob_to_copy = blob_service_client_origin.get_blob_client(container=container_name, blob=blob_name)

    sas_token = generate_blob_sas(
        blob_to_copy.account_name,
        blob_to_copy.container_name,
        blob_to_copy.blob_name,
        account_key=blob_service_client_origin.credential.account_key,
        permission=BlobSasPermissions(read=True),
        start=datetime.utcnow() + timedelta(seconds=1),
        expiry=datetime.utcnow() + timedelta(hours=1))

    logging.info('sas token: %s',sas_token)

    archived_container = blob_service_client.get_container_client(archived_container_name)

    # Create new Container
    try:
        archived_container.create_container()
    except ResourceExistsError:
        pass

    copied_blob = blob_service_client.get_blob_client(
        archived_container_name, blob_name)

    blob_to_copy_url = blob_url + '?' + sas_token

    logging.info('blob url: ' + blob_to_copy_url)

    # Start copy
    copied_blob.start_copy_from_url(blob_to_copy_url)