使用 Python 个 SDK 将视频上传到 Azure 媒体服务
Uploading a Video to Azure Media Services with Python SDKs
我目前正在寻找一种通过 Python SDK 将视频上传到 Azure 媒体服务 (AMS v3) 的方法。我已经按照它的指示操作,并且能够成功连接到AMS。
例子
credentials = AdalAuthentication(
context.acquire_token_with_client_credentials,
RESOURCE,
CLIENT,
KEY)
client = AzureMediaServices(credentials, SUBSCRIPTION_ID) # Successful
我也成功获取了通过其门户上传的所有视频的详细信息
for data in client.assets.list(RESOUCE_GROUP_NAME, ACCOUNT_NAME).get(0):
print(f'Asset_name: {data.name}, file_name: {data.description}')
# Asset_name: 4f904060-d15c-4880-8c5a-xxxxxxxx, file_name: 夢想全紀錄.mp4
# Asset_name: 8f2e5e36-d043-4182-9634-xxxxxxxx, file_name: an552Qb_460svvp9.webm
# Asset_name: aef495c1-a3dd-49bb-8e3e-xxxxxxxx, file_name: world_war_2.webm
# Asset_name: b53d8152-6ecd-41a2-a59e-xxxxxxxx, file_name: an552Qb_460svvp9.webm - Media Encoder Standard encoded
然而,当我尝试使用以下方法时;它失败了。因为我不知道要解析什么 parameters - Link to Python SDKs
create_or_update(resource_group_name, account_name, asset_name,
parameters, custom_headers=None, raw=False, **operation_config)
因此,想请教如下问题(都是通过Python SDK完成的):
- 它期望什么样的参数?
- 视频可以直接上传到 AMS 还是应该先上传到 Blob 存储?
- 资产应该只包含一个视频还是多个文件?
- 该方法的 REST 版本的文档位于 https://docs.microsoft.com/en-us/rest/api/media/assets/createorupdate。这实际上与 Python 参数相同。
- 视频存储在用于媒体服务的 Azure 存储中。对于输入资产、编码的资产和任何流媒体内容都是如此。这一切都在存储中,但由媒体服务访问。您确实需要在创建存储容器的媒体服务中创建一个资产。一旦存储容器存在,您就可以通过存储 API 上传到该媒体服务创建的容器。
- 从技术上讲,多个文件没有问题,但这样做会出现许多您可能没有想到的问题。我建议使用 1 个输入视频 = 1 个媒体服务资产。在编码输出端,资产中会有多个文件。编码输出包含一个或多个视频、清单和元数据文件。
我找到了解决使用 Python SDK 和 REST 的方法;但是,我不太确定它是否合适。
通过 Python 包登录到 Azure 媒体服务和 Blob 存储
import adal
from msrestazure.azure_active_directory import AdalAuthentication
from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD
from azure.mgmt.media import AzureMediaServices
from azure.mgmt.media.models import MediaService
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
通过解析这些参数为原始文件和编码文件创建资产。原始文件资产创建示例。
asset_name = 'asset-myvideo'
asset_properties = {
'properties': {
'description': 'Original File Description',
'storageAccountName': "storage-account-name"
}
}
client.assets.create_or_update(RESOUCE_GROUP_NAME, ACCOUNT_NAME, asset_name, asset_properties)
将视频上传到源自创建的原始资产的 Blob 存储
current_container = [data.container for data in client.assets.list(RESOUCE_GROUP_NAME, ACCOUNT_NAME).get(0) if data.name == asset_name][0] # Get Blob Storage location
file_name = "myvideo.mp4"
blob_client = blob_service_client.get_blob_client(container=current_container, blob=file_name)
with open('original_video.mp4', 'rb') as data:
blob_client.upload_blob(data)
print(f'Video uploaded to {current_container}')
然后,我执行转换、作业和流定位器以成功获取视频流 Link。
我能够让它与更新的 python SDK 一起工作。 python 文档大部分缺失,所以我主要从 python SDK 源代码和 C# 示例构建它。
azure-storage-blob==12.3.1
azure-mgmt-media==2.1.0
azure-mgmt-resource==9.0.0
adal~=1.2.2
msrestazure~=0.6.3
0) 导入很多东西
from azure.mgmt.media.models import Asset, Transform, Job,
BuiltInStandardEncoderPreset, TransformOutput, \
JobInputAsset, JobOutputAsset, AssetContainerSas, AssetContainerPermission
import adal
from msrestazure.azure_active_directory import AdalAuthentication
from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD
from azure.mgmt.media import AzureMediaServices
from azure.storage.blob import BlobServiceClient, ContainerClient
import datetime as dt
import time
LOGIN_ENDPOINT = AZURE_PUBLIC_CLOUD.endpoints.active_directory
RESOURCE = AZURE_PUBLIC_CLOUD.endpoints.active_directory_resource_id
# AzureSettings is a custom NamedTuple
1) 登录 AMS:
def get_ams_client(settings: AzureSettings) -> AzureMediaServices:
context = adal.AuthenticationContext(LOGIN_ENDPOINT + '/' +
settings.AZURE_MEDIA_TENANT_ID)
credentials = AdalAuthentication(
context.acquire_token_with_client_credentials,
RESOURCE,
settings.AZURE_MEDIA_CLIENT_ID,
settings.AZURE_MEDIA_SECRET
)
return AzureMediaServices(credentials, settings.AZURE_SUBSCRIPTION_ID)
2) 创建输入和输出资产
input_asset = create_or_update_asset(
input_asset_name, "My Input Asset", client, azure_settings)
input_asset = create_or_update_asset(
output_asset_name, "My Output Asset", client, azure_settings)
3) 获取容器名称。 (大多数文档都提到 BlockBlobService,它似乎已从 SDK 中删除)
def get_container_name(client: AzureMediaServices, asset_name: str, settings: AzureSettings):
expiry_time = dt.datetime.now(dt.timezone.utc) + dt.timedelta(hours=4)
container_list: AssetContainerSas = client.assets.list_container_sas(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
asset_name=asset_name,
permissions = AssetContainerPermission.read_write,
expiry_time=expiry_time
)
sas_uri: str = container_list.asset_container_sas_urls[0]
container_client: ContainerClient = ContainerClient.from_container_url(sas_uri)
return container_client.container_name
4) 将文件上传到输入资产容器:
def upload_file_to_asset_container(
container: str, local_file, uploaded_file_name, settings: AzureSettings):
blob_service_client = BlobServiceClient.from_connection_string(settings.AZURE_MEDIA_STORAGE_CONNECTION_STRING))
blob_client = blob_service_client.get_blob_client(container=container, blob=uploaded_file_name)
with open(local_file, 'rb') as data:
blob_client.upload_blob(data)
5) 创建转换(在我的例子中,使用自适应流预设):
def get_or_create_transform(
client: AzureMediaServices,
transform_name: str,
settings: AzureSettings):
transform_output = TransformOutput(preset=BuiltInStandardEncoderPreset(preset_name="AdaptiveStreaming"))
transform: Transform = client.transforms.create_or_update(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
transform_name=transform_name,
outputs=[transform_output]
)
return transform
5) 提交作业
def submit_job(
client: AzureMediaServices,
settings: AzureSettings,
input_asset: Asset,
output_asset: Asset,
transform_name: str,
correlation_data: dict) -> Job:
job_input = JobInputAsset(asset_name=input_asset.name)
job_outputs = [JobOutputAsset(asset_name=output_asset.name)]
job: Job = client.jobs.create(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
job_name=f"test_job_{UNIQUENESS}",
transform_name=transform_name,
parameters=Job(input=job_input,
outputs=job_outputs,
correlation_data=correlation_data)
)
return job
6) 然后在事件网格告诉我工作完成后我得到了 URL:
# side-effect warning: this starts the streaming endpoint $$$
def get_urls(client: AzureMediaServices, output_asset_name: str
locator_name: str):
try:
locator: StreamingLocator = client.streaming_locators.create(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
streaming_locator_name=locator_name,
parameters=StreamingLocator(
asset_name=output_asset_name,
streaming_policy_name="Predefined_ClearStreamingOnly"
)
)
except Exception as ex:
print("ignoring existing")
streaming_endpoint: StreamingEndpoint = client.streaming_endpoints.get(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
streaming_endpoint_name="default")
if streaming_endpoint:
if streaming_endpoint.resource_state != "Running":
client.streaming_endpoints.start(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
streaming_endpoint_name="default"
)
paths = client.streaming_locators.list_paths(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
streaming_locator_name=locator_name
)
return [f"https://{streaming_endpoint.host_name}{path.paths[0]}" for path in paths.streaming_paths]
我目前正在寻找一种通过 Python SDK 将视频上传到 Azure 媒体服务 (AMS v3) 的方法。我已经按照它的指示操作,并且能够成功连接到AMS。
例子
credentials = AdalAuthentication(
context.acquire_token_with_client_credentials,
RESOURCE,
CLIENT,
KEY)
client = AzureMediaServices(credentials, SUBSCRIPTION_ID) # Successful
我也成功获取了通过其门户上传的所有视频的详细信息
for data in client.assets.list(RESOUCE_GROUP_NAME, ACCOUNT_NAME).get(0):
print(f'Asset_name: {data.name}, file_name: {data.description}')
# Asset_name: 4f904060-d15c-4880-8c5a-xxxxxxxx, file_name: 夢想全紀錄.mp4
# Asset_name: 8f2e5e36-d043-4182-9634-xxxxxxxx, file_name: an552Qb_460svvp9.webm
# Asset_name: aef495c1-a3dd-49bb-8e3e-xxxxxxxx, file_name: world_war_2.webm
# Asset_name: b53d8152-6ecd-41a2-a59e-xxxxxxxx, file_name: an552Qb_460svvp9.webm - Media Encoder Standard encoded
然而,当我尝试使用以下方法时;它失败了。因为我不知道要解析什么 parameters - Link to Python SDKs
create_or_update(resource_group_name, account_name, asset_name,
parameters, custom_headers=None, raw=False, **operation_config)
因此,想请教如下问题(都是通过Python SDK完成的):
- 它期望什么样的参数?
- 视频可以直接上传到 AMS 还是应该先上传到 Blob 存储?
- 资产应该只包含一个视频还是多个文件?
- 该方法的 REST 版本的文档位于 https://docs.microsoft.com/en-us/rest/api/media/assets/createorupdate。这实际上与 Python 参数相同。
- 视频存储在用于媒体服务的 Azure 存储中。对于输入资产、编码的资产和任何流媒体内容都是如此。这一切都在存储中,但由媒体服务访问。您确实需要在创建存储容器的媒体服务中创建一个资产。一旦存储容器存在,您就可以通过存储 API 上传到该媒体服务创建的容器。
- 从技术上讲,多个文件没有问题,但这样做会出现许多您可能没有想到的问题。我建议使用 1 个输入视频 = 1 个媒体服务资产。在编码输出端,资产中会有多个文件。编码输出包含一个或多个视频、清单和元数据文件。
我找到了解决使用 Python SDK 和 REST 的方法;但是,我不太确定它是否合适。
通过 Python 包登录到 Azure 媒体服务和 Blob 存储
import adal from msrestazure.azure_active_directory import AdalAuthentication from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD from azure.mgmt.media import AzureMediaServices from azure.mgmt.media.models import MediaService from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
通过解析这些参数为原始文件和编码文件创建资产。原始文件资产创建示例。
asset_name = 'asset-myvideo' asset_properties = { 'properties': { 'description': 'Original File Description', 'storageAccountName': "storage-account-name" } } client.assets.create_or_update(RESOUCE_GROUP_NAME, ACCOUNT_NAME, asset_name, asset_properties)
将视频上传到源自创建的原始资产的 Blob 存储
current_container = [data.container for data in client.assets.list(RESOUCE_GROUP_NAME, ACCOUNT_NAME).get(0) if data.name == asset_name][0] # Get Blob Storage location file_name = "myvideo.mp4" blob_client = blob_service_client.get_blob_client(container=current_container, blob=file_name) with open('original_video.mp4', 'rb') as data: blob_client.upload_blob(data) print(f'Video uploaded to {current_container}')
然后,我执行转换、作业和流定位器以成功获取视频流 Link。
我能够让它与更新的 python SDK 一起工作。 python 文档大部分缺失,所以我主要从 python SDK 源代码和 C# 示例构建它。
azure-storage-blob==12.3.1
azure-mgmt-media==2.1.0
azure-mgmt-resource==9.0.0
adal~=1.2.2
msrestazure~=0.6.3
0) 导入很多东西
from azure.mgmt.media.models import Asset, Transform, Job,
BuiltInStandardEncoderPreset, TransformOutput, \
JobInputAsset, JobOutputAsset, AssetContainerSas, AssetContainerPermission
import adal
from msrestazure.azure_active_directory import AdalAuthentication
from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD
from azure.mgmt.media import AzureMediaServices
from azure.storage.blob import BlobServiceClient, ContainerClient
import datetime as dt
import time
LOGIN_ENDPOINT = AZURE_PUBLIC_CLOUD.endpoints.active_directory
RESOURCE = AZURE_PUBLIC_CLOUD.endpoints.active_directory_resource_id
# AzureSettings is a custom NamedTuple
1) 登录 AMS:
def get_ams_client(settings: AzureSettings) -> AzureMediaServices:
context = adal.AuthenticationContext(LOGIN_ENDPOINT + '/' +
settings.AZURE_MEDIA_TENANT_ID)
credentials = AdalAuthentication(
context.acquire_token_with_client_credentials,
RESOURCE,
settings.AZURE_MEDIA_CLIENT_ID,
settings.AZURE_MEDIA_SECRET
)
return AzureMediaServices(credentials, settings.AZURE_SUBSCRIPTION_ID)
2) 创建输入和输出资产
input_asset = create_or_update_asset(
input_asset_name, "My Input Asset", client, azure_settings)
input_asset = create_or_update_asset(
output_asset_name, "My Output Asset", client, azure_settings)
3) 获取容器名称。 (大多数文档都提到 BlockBlobService,它似乎已从 SDK 中删除)
def get_container_name(client: AzureMediaServices, asset_name: str, settings: AzureSettings):
expiry_time = dt.datetime.now(dt.timezone.utc) + dt.timedelta(hours=4)
container_list: AssetContainerSas = client.assets.list_container_sas(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
asset_name=asset_name,
permissions = AssetContainerPermission.read_write,
expiry_time=expiry_time
)
sas_uri: str = container_list.asset_container_sas_urls[0]
container_client: ContainerClient = ContainerClient.from_container_url(sas_uri)
return container_client.container_name
4) 将文件上传到输入资产容器:
def upload_file_to_asset_container(
container: str, local_file, uploaded_file_name, settings: AzureSettings):
blob_service_client = BlobServiceClient.from_connection_string(settings.AZURE_MEDIA_STORAGE_CONNECTION_STRING))
blob_client = blob_service_client.get_blob_client(container=container, blob=uploaded_file_name)
with open(local_file, 'rb') as data:
blob_client.upload_blob(data)
5) 创建转换(在我的例子中,使用自适应流预设):
def get_or_create_transform(
client: AzureMediaServices,
transform_name: str,
settings: AzureSettings):
transform_output = TransformOutput(preset=BuiltInStandardEncoderPreset(preset_name="AdaptiveStreaming"))
transform: Transform = client.transforms.create_or_update(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
transform_name=transform_name,
outputs=[transform_output]
)
return transform
5) 提交作业
def submit_job(
client: AzureMediaServices,
settings: AzureSettings,
input_asset: Asset,
output_asset: Asset,
transform_name: str,
correlation_data: dict) -> Job:
job_input = JobInputAsset(asset_name=input_asset.name)
job_outputs = [JobOutputAsset(asset_name=output_asset.name)]
job: Job = client.jobs.create(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
job_name=f"test_job_{UNIQUENESS}",
transform_name=transform_name,
parameters=Job(input=job_input,
outputs=job_outputs,
correlation_data=correlation_data)
)
return job
6) 然后在事件网格告诉我工作完成后我得到了 URL:
# side-effect warning: this starts the streaming endpoint $$$
def get_urls(client: AzureMediaServices, output_asset_name: str
locator_name: str):
try:
locator: StreamingLocator = client.streaming_locators.create(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
streaming_locator_name=locator_name,
parameters=StreamingLocator(
asset_name=output_asset_name,
streaming_policy_name="Predefined_ClearStreamingOnly"
)
)
except Exception as ex:
print("ignoring existing")
streaming_endpoint: StreamingEndpoint = client.streaming_endpoints.get(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
streaming_endpoint_name="default")
if streaming_endpoint:
if streaming_endpoint.resource_state != "Running":
client.streaming_endpoints.start(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
streaming_endpoint_name="default"
)
paths = client.streaming_locators.list_paths(
resource_group_name=settings.AZURE_MEDIA_RESOURCE_GROUP_NAME,
account_name=settings.AZURE_MEDIA_ACCOUNT_NAME,
streaming_locator_name=locator_name
)
return [f"https://{streaming_endpoint.host_name}{path.paths[0]}" for path in paths.streaming_paths]