Youtube 数据 API:从不包括直播流的频道获取最新视频 ID

Youtube Data API: Get latest video ID from channel excluding live streams

在我的 python 脚本中,我正在获取最新视频的视频 ID。

这是代码,playlistId 是包含我所有视频的频道播放列表 ID:

def get_latest_video_id(youtube, playlistId): 
    id_request = youtube.playlistItems().list(
        part = 'snippet',
        playlistId = playlistId
    ) 
    id_response = id_request.execute()
    video_id = id_response['items'][0]['snippet']['resourceId']['videoId']
    return video_id

现在的问题是,我的直播也被保存到这个播放列表中。我找不到播放列表是否包含我所有上传的内容,但不包括我保存的直播。

我想到的解决方法是获取我所有直播的列表,并将它们的 ID 与我通过上述方法获得的 ID 进行比较。

我的问题是,没有更好的方法吗?是否有一个 API 调用可以满足我的需求,而且配额成本不高?

您必须重复调用 PlaylistItems.list API 端点(使用分页)以手动过滤掉直播视频。

def get_non_livestream_videos(youtube, video_ids):
    assert len(video_ids) <= 50
    if not video_ids: return []

    response = youtube.videos().list(
        fields = 'items(id,liveStreamingDetails)',
        part = 'id,liveStreamingDetails',
        maxResults = len(video_ids),
        id = ','.join(video_ids),
    ).execute()

    items = response.get('items', [])
    assert len(items) <= len(video_ids)

    not_live = lambda video: \
        not video.get('liveStreamingDetails')
    video_id = lambda video: video['id']

    return map(video_id, filter(not_live, items))

def get_latest_video_id(youtube, playlistId): 
    request = youtube.playlistItems().list(
        fields = 'nextPageToken,items/snippet/resourceId',
        playlistId = playlistId,
        maxResults = 50,
        part = 'snippet'
    )

    is_video = lambda item: \
        item['snippet']['resourceId']['kind'] == 'youtube#video'
    video_id = lambda item: \
        item['snippet']['resourceId']['videoId']

    while request:
        response = request.execute()

        items = response.get('items', [])
        assert len(items) <= 50

        videos = map(video_id, filter(is_video, items))
        if videos:
            videos = get_non_livestream_videos(youtube, videos)
            if videos: return videos[0]

        request = youtube.playlistItems().list_next(
            request, response)

    return None

请注意,上面我使用 fields 请求参数从 API 中仅获取实际需要的信息。

另请注意,您可能需要详细说明函数 get_non_livestream_videos,因为 Videos.list API endpoint queried with its id 参数作为逗号分隔的视频 ID 列表 可能会改变顺序 个项目 return 是 w.r.t。 video_ids.

中 ID 的给定顺序

还有一个重要提示:如果你是 运行 上面 Python 3 下的代码(你的问题没有提到这个),那么请确保你有以下 在脚本顶部插入的配置代码:

if sys.version_info[0] >= 3:
    from builtins import map as builtin_map
    map = lambda *args: list(builtin_map(*args))

这是必需的,因为在 Python 3 下,内置函数 map returns an iterator, whereas under Python 2, map returns a list.


这是解决我上面提到的问题的代码 w.r.t。 Videos.list 改变项目顺序 returned 相对于函数 get_non_livestream_videos 的参数 video_ids 给出的 ID 顺序的情况:

import sys

if sys.version_info[0] >= 3:
    from builtins import map as builtin_map
    map = lambda *args: list(builtin_map(*args))

class MergeVideoListsError(Exception): pass

def merge_video_lists(video_ids, video_res):
    pair0 = lambda pair: pair[0]
    pair1 = lambda pair: pair[1]

    video_ids = sorted(
        enumerate(video_ids), key = pair1)
    video_res.sort(
        key = lambda video: video['id'])

    def error(video_id):
        raise MergeVideoListsError(
            "unexpected video resource of ID '%s'" % video_id)

    def do_merge():
        N = len(video_ids)
        R = len(video_res)
        assert R <= N

        l = []
        i, j = 0, 0
        while i < N and j < R:
            v = video_ids[i]
            r = video_res[j]
            s = v[1]
            d = r['id']
            if s == d:
                l.append((v[0], r))
                i += 1
                j += 1
            elif s < d:
                i += 1
            else:
                error(d)

        if j < R:
            error(video_res[j]['id'])

        return l

    video_res = do_merge()
    video_res.sort(key = pair0)
    return map(pair1, video_res)

def println(*args):
    for a in args:
        sys.stdout.write(str(a))
    sys.stdout.write('\n')

def test_merge_video_lists(ids, res, val):
    try:
        println("ids:   ", ids)
        println("res:   ", res)
        r = merge_video_lists(ids, res)
        println("merge: ", r)
    except MergeVideoListsError as e:
        println("error: ", e)
        r = str(e)
    finally:
        println("test:  ", "OK" \
            if val == r \
            else "failed")

TESTS = ((
    ['c', 'b', 'a'],
    [{'id': 'c'}, {'id': 'a'}, {'id': 'b'}],
    [{'id': 'c'}, {'id': 'b'}, {'id': 'a'}]
),(
    ['c', 'b', 'a'],
    [{'id': 'b'}, {'id': 'c'}],
    [{'id': 'c'}, {'id': 'b'}]
),(
    ['c', 'b', 'a'],
    [{'id': 'a'}, {'id': 'c'}],
    [{'id': 'c'}, {'id': 'a'}]
),(
    ['c', 'b', 'a'],
    [{'id': 'a'}, {'id': 'b'}],
    [{'id': 'b'}, {'id': 'a'}]
),(
    ['c', 'b', 'a'],
    [{'id': 'z'}, {'id': 'b'}, {'id': 'c'}],
    "unexpected video resource of ID 'z'"
),(
    ['c', 'b', 'a'],
    [{'id': 'a'}, {'id': 'z'}, {'id': 'c'}],
    "unexpected video resource of ID 'z'"
),(
    ['c', 'b', 'a'],
    [{'id': 'a'}, {'id': 'b'}, {'id': 'z'}],
    "unexpected video resource of ID 'z'"
))

def main():
    for i, t in enumerate(TESTS):
        if i: println()
        test_merge_video_lists(*t)

if __name__ == '__main__':
    main()

# $ python merge-video-lists.py
# ids:   ['c', 'b', 'a']
# res:   [{'id': 'c'}, {'id': 'a'}, {'id': 'b'}]
# merge: [{'id': 'c'}, {'id': 'b'}, {'id': 'a'}]
# test:  OK
# 
# ids:   ['c', 'b', 'a']
# res:   [{'id': 'b'}, {'id': 'c'}]
# merge: [{'id': 'c'}, {'id': 'b'}]
# test:  OK
# 
# ids:   ['c', 'b', 'a']
# res:   [{'id': 'a'}, {'id': 'c'}]
# merge: [{'id': 'c'}, {'id': 'a'}]
# test:  OK
# 
# ids:   ['c', 'b', 'a']
# res:   [{'id': 'a'}, {'id': 'b'}]
# merge: [{'id': 'b'}, {'id': 'a'}]
# test:  OK
# 
# ids:   ['c', 'b', 'a']
# res:   [{'id': 'z'}, {'id': 'b'}, {'id': 'c'}]
# error: unexpected video resource of ID 'z'
# test:  OK
# 
# ids:   ['c', 'b', 'a']
# res:   [{'id': 'a'}, {'id': 'z'}, {'id': 'c'}]
# error: unexpected video resource of ID 'z'
# test:  OK
# 
# ids:   ['c', 'b', 'a']
# res:   [{'id': 'a'}, {'id': 'b'}, {'id': 'z'}]
# error: unexpected video resource of ID 'z'
# test:  OK

上面的代码是一个独立程序(运行 在 Python v2 和 v3 下)实现了 merging 函数 merge_video_lists

您必须在函数 get_non_livestream_videos 中使用此函数,方法是替换行:

return map(video_id, filter(not_live, items))

与:

return map(video_id, merge_video_lists(
    video_ids, filter(not_live, items)))

for Python 2. 对于 Python 3,替换为:

return map(video_id, merge_video_lists(
    video_ids, list(filter(not_live, items))))

不要替换 return 语句,只需在该语句之前添加以下语句:

items = merge_video_lists(video_ids, items)

后一种变体更好,因为它还验证由 API 编辑的视频 ID return:如果有不在 video_ids 中的 ID,则 merge_video_lists 抛出一个 MergeVideoListsError 异常指示罪魁祸首 ID。


要获取恰好 N 天前的所有视频(不包括直播),请使用以下函数:

def get_days_old_video_ids(youtube, playlistId, days = 7): 
    from datetime import date, datetime, timedelta
    n_days = date.today() - timedelta(days = days)

    request = youtube.playlistItems().list(
        fields = 'nextPageToken,items(snippet/resourceId,contentDetails/videoPublishedAt)',
        part = 'snippet,contentDetails',
        playlistId = playlistId,
        maxResults = 50
    )

    def parse_published_at(item):
        details = item['contentDetails']
        details['videoPublishedAt'] = datetime.strptime(
            details['videoPublishedAt'],
            '%Y-%m-%dT%H:%M:%SZ') \
            .date()
        return item

    def find_if(cond, items):
        for item in items:
            if cond(item):
                return True
        return False

    n_days_eq = lambda item: \
        item['contentDetails']['videoPublishedAt'] == n_days
    n_days_lt = lambda item: \
        item['contentDetails']['videoPublishedAt'] < n_days
    is_video = lambda item: \
        item['snippet']['resourceId']['kind'] == 'youtube#video'
    video_id = lambda item: \
        item['snippet']['resourceId']['videoId']

    videos = []

    while request:
        response = request.execute()

        items = response.get('items', [])
        assert len(items) <= 50

        # remove the non-video entries in 'items'
        items = filter(is_video, items)

        # replace each 'videoPublishedAt' with
        # its corresponding parsed date object
        items = map(parse_published_at, items)

        # terminate loop when found a 'videoPublishedAt' < n_days
        done = find_if(n_days_lt, items)

        # retain only the items with 'videoPublishedAt' == n_days
        items = filter(n_days_eq, items)

        # add to 'videos' the IDs of videos in 'items' that are not live streams
        videos.extend(get_non_livestream_videos(youtube, map(video_id, items)))

        if done: break

        request = youtube.playlistItems().list_next(
            request, response)

    return videos

上面的函数 get_days_old_video_ids 需要 filtermap 到 return 列表,因此上面的 配置 代码必须更新为:

if sys.version_info[0] >= 3:
    from builtins import map as builtin_map
    map = lambda *args: list(builtin_map(*args))
    from builtins import filter as builtin_filter
    filter = lambda *args: list(builtin_filter(*args))

请注意,get_days_old_video_ids 依赖于 PlaylistItems.list 生成的结果集的以下 undocumented property对于频道的上传播放列表,项目returned by PlaylistItems.listcontentDetails.videoPublishedAt.

按相反的时间顺序排列(最新的在前)

因此您必须确保 get_days_old_video_ids 的参数 playlistId 是您频道上传的播放列表的 ID。通常,频道 ID 及其对应的上传播放列表 ID 通过 s/^UC([0-9a-zA-Z_-]{22})$/UU/.

关联

另请注意,get_days_old_video_ids return 正在 完全 days 旧视频的 ID。如果需要获取最多days旧的视频ID,则定义:

    n_days_ge = lambda item: \
        item['contentDetails']['videoPublishedAt'] >= n_days

并将 n_days_eq 替换为 n_days_ge

还有一点要注意:在上面函数 get_non_livestream_videos 的顶部,我添加了语句:

    if not video_ids: return []

这样可以避免处理一个空的 video_ids 列表。