使用 Python 个 SDK 将视频上传到 Azure 媒体服务

Uploading a Video to Azure Media Services with Python SDKs

我目前正在寻找一种通过 Python SDK 将视频上传到 Azure 媒体服务 (AMS v3) 的方法。我已经按照它的指示操作,并且能够成功连接到AMS。

例子

credentials = AdalAuthentication(
    context.acquire_token_with_client_credentials,
    RESOURCE,
    CLIENT,
    KEY)

client = AzureMediaServices(credentials, SUBSCRIPTION_ID) # Successful

我也成功获取了通过其门户上传的所有视频的详细信息

for data in client.assets.list(RESOUCE_GROUP_NAME, ACCOUNT_NAME).get(0):
    print(f'Asset_name: {data.name}, file_name: {data.description}')

# Asset_name: 4f904060-d15c-4880-8c5a-xxxxxxxx, file_name: 夢想全紀錄.mp4
# Asset_name: 8f2e5e36-d043-4182-9634-xxxxxxxx, file_name: an552Qb_460svvp9.webm
# Asset_name: aef495c1-a3dd-49bb-8e3e-xxxxxxxx, file_name: world_war_2.webm
# Asset_name: b53d8152-6ecd-41a2-a59e-xxxxxxxx, file_name: an552Qb_460svvp9.webm - Media Encoder Standard encoded

然而,当我尝试使用以下方法时;它失败了。因为我不知道要解析什么 parameters - Link to Python SDKs

create_or_update(resource_group_name, account_name, asset_name,
                 parameters, custom_headers=None, raw=False, **operation_config)


因此,想请教如下问题(都是通过Python SDK完成的):

  1. 它期望什么样的参数?
  2. 视频可以直接上传到 AMS 还是应该先上传到 Blob 存储?
  3. 资产应该只包含一个视频还是多个文件?
  1. 该方法的 REST 版本的文档位于 https://docs.microsoft.com/en-us/rest/api/media/assets/createorupdate。这实际上与 Python 参数相同。
  2. 视频存储在用于媒体服务的 Azure 存储中。对于输入资产、编码的资产和任何流媒体内容都是如此。这一切都在存储中,但由媒体服务访问。您确实需要在创建存储容器的媒体服务中创建一个资产。一旦存储容器存在,您就可以通过存储 API 上传到该媒体服务创建的容器。
  3. 从技术上讲,多个文件没有问题,但这样做会出现许多您可能没有想到的问题。我建议使用 1 个输入视频 = 1 个媒体服务资产。在编码输出端,资产中会有多个文件。编码输出包含一个或多个视频、清单和元数据文件。

我找到了解决使用 Python SDK 和 REST 的方法;但是,我不太确定它是否合适。

  1. 通过 Python 包登录到 Azure 媒体服务和 Blob 存储

    import adal
    from msrestazure.azure_active_directory import AdalAuthentication
    from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD
    from azure.mgmt.media import AzureMediaServices
    from azure.mgmt.media.models import MediaService
    
    from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
    
  2. 通过解析这些参数为原始文件和编码文件创建资产。原始文件资产创建示例。

    asset_name = 'asset-myvideo'
    asset_properties = {
        'properties': {
            'description': 'Original File Description',
            'storageAccountName': "storage-account-name"
        }
    }
    client.assets.create_or_update(RESOUCE_GROUP_NAME, ACCOUNT_NAME, asset_name, asset_properties)
    
  3. 将视频上传到源自创建的原始资产的 Blob 存储

    current_container = [data.container for data in client.assets.list(RESOUCE_GROUP_NAME, ACCOUNT_NAME).get(0) if data.name == asset_name][0] # Get Blob Storage location
    file_name = "myvideo.mp4"
    blob_client = blob_service_client.get_blob_client(container=current_container, blob=file_name)
    
    with open('original_video.mp4', 'rb') as data:
        blob_client.upload_blob(data)
    print(f'Video uploaded to {current_container}')
    

然后,我执行转换、作业和流定位器以成功获取视频流 Link。

我能够让它与更新的 python SDK 一起工作。 python 文档大部分缺失,所以我主要从 python SDK 源代码和 C# 示例构建它。

azure-storage-blob==12.3.1
azure-mgmt-media==2.1.0
azure-mgmt-resource==9.0.0
adal~=1.2.2
msrestazure~=0.6.3

0) 导入很多东西

from azure.mgmt.media.models import Asset, Transform, Job, 
    BuiltInStandardEncoderPreset, TransformOutput, \
    JobInputAsset, JobOutputAsset, AssetContainerSas, AssetContainerPermission

import adal
from msrestazure.azure_active_directory import AdalAuthentication
from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD
from azure.mgmt.media import AzureMediaServices
from azure.storage.blob import BlobServiceClient, ContainerClient
import datetime as dt
import time


LOGIN_ENDPOINT = AZURE_PUBLIC_CLOUD.endpoints.active_directory
RESOURCE = AZURE_PUBLIC_CLOUD.endpoints.active_directory_resource_id

# AzureSettings is a custom NamedTuple

1) 登录 AMS:

def get_ams_client(settings: AzureSettings) -> AzureMediaServices:
    context = adal.AuthenticationContext(LOGIN_ENDPOINT + '/' + 
        settings.AZURE_MEDIA_TENANT_ID)
    credentials = AdalAuthentication(
        context.acquire_token_with_client_credentials,
        RESOURCE,
        settings.AZURE_MEDIA_CLIENT_ID,
        settings.AZURE_MEDIA_SECRET
    )
    return AzureMediaServices(credentials, settings.AZURE_SUBSCRIPTION_ID)

2) 创建输入和输出资产

input_asset = create_or_update_asset(
    input_asset_name, "My Input Asset", client, azure_settings)
input_asset = create_or_update_asset(
    output_asset_name, "My Output Asset", client, azure_settings)

3) 获取容器名称。 (大多数文档都提到 BlockBlobService,它似乎已从 SDK 中删除)

def get_container_name(client: AzureMediaServices, asset_name: str, settings: AzureSettings):
    expiry_time = dt.datetime.now(dt.timezone.utc) + dt.timedelta(hours=4)
    container_list: AssetContainerSas = client.assets.list_container_sas(
        resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
        account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
        asset_name=asset_name,
        permissions = AssetContainerPermission.read_write,
        expiry_time=expiry_time
    )
    sas_uri: str = container_list.asset_container_sas_urls[0]
    container_client: ContainerClient = ContainerClient.from_container_url(sas_uri)
    return container_client.container_name

4) 将文件上传到输入资产容器:

def upload_file_to_asset_container(
    container: str, local_file, uploaded_file_name, settings: AzureSettings):
    blob_service_client = BlobServiceClient.from_connection_string(settings.AZURE_MEDIA_STORAGE_CONNECTION_STRING))
    blob_client = blob_service_client.get_blob_client(container=container, blob=uploaded_file_name)

    with open(local_file, 'rb') as data:
            blob_client.upload_blob(data)

5) 创建转换(在我的例子中,使用自适应流预设):

def get_or_create_transform(
    client: AzureMediaServices,
    transform_name: str,
    settings: AzureSettings):
    transform_output = TransformOutput(preset=BuiltInStandardEncoderPreset(preset_name="AdaptiveStreaming"))
    transform: Transform = client.transforms.create_or_update(
        resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
        account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
        transform_name=transform_name,
        outputs=[transform_output]
    )
    return transform

5) 提交作业

def submit_job(
    client: AzureMediaServices,
    settings: AzureSettings,
    input_asset: Asset,
    output_asset: Asset,
    transform_name: str,
    correlation_data: dict) -> Job:

    job_input = JobInputAsset(asset_name=input_asset.name)
    job_outputs = [JobOutputAsset(asset_name=output_asset.name)]
    job: Job = client.jobs.create(
        resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
        account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
        job_name=f"test_job_{UNIQUENESS}",
        transform_name=transform_name,
        parameters=Job(input=job_input,
            outputs=job_outputs,
            correlation_data=correlation_data)
    )
    return job

6) 然后在事件网格告诉我工作完成后我得到了 URL:

# side-effect warning: this starts the streaming endpoint $$$
def get_urls(client: AzureMediaServices, output_asset_name: str
    locator_name: str):

    try:
        locator: StreamingLocator = client.streaming_locators.create(
            resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
            account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
            streaming_locator_name=locator_name,
            parameters=StreamingLocator(
                asset_name=output_asset_name,
                streaming_policy_name="Predefined_ClearStreamingOnly"
            )
        )
    except Exception as ex:
        print("ignoring existing")

    streaming_endpoint: StreamingEndpoint = client.streaming_endpoints.get(
        resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
        account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
        streaming_endpoint_name="default")

    if streaming_endpoint:
        if streaming_endpoint.resource_state != "Running":
            client.streaming_endpoints.start(
                resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
                account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
                streaming_endpoint_name="default"
            )

    paths = client.streaming_locators.list_paths(
        resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
        account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
        streaming_locator_name=locator_name
    )
    return [f"https://{streaming_endpoint.host_name}{path.paths[0]}" for path in paths.streaming_paths]