youtube api - 等待视频
youtube api - waiting for video
我制作了一个等待来自多个频道的视频并将新视频发送到电报的机器人。这是代码:
from distutils.log import error
from googleapiclient.discovery import build
import config
import time
import telebot
ChannelsAndVideos = []
Channels = config.channels_id
bot_token = config.bot_token
api_key = config.api_key
sleepTime = config.sleepTime
messageText = config.messageText
telegram_channels_id = config.telegram_channels_id
youtube = build('youtube', 'v3', developerKey=api_key)
bot = telebot.TeleBot(bot_token)
sended = []
for ChannelId in Channels:
try:
request = youtube.channels().list(
part='statistics',
id=ChannelId
)
response = request.execute()
if(response['pageInfo']['totalResults'] != 1):
print("Error! Не удалось добавить " + ChannelId)
else:
print(ChannelId + " был добавлен (" + response.get('items')[0].get('statistics').get('videoCount') + " видео)")
ChannelsAndVideos.append((ChannelId, response.get('items')[0].get('statistics').get('videoCount')))
except error:
print(error.message)
while(True):
time.sleep(sleepTime)
for i in range(len(ChannelsAndVideos)):
request = youtube.channels().list(
part='statistics',
id=ChannelsAndVideos[i][0]
)
response = request.execute()
if(response['pageInfo']['totalResults'] != 1):
print("Error!")
else:
if response.get('items')[0].get('statistics').get('videoCount') > ChannelsAndVideos[i][1]:
ChannelsAndVideos[i] = (ChannelsAndVideos[i][0], response.get('items')[0].get('statistics').get('videoCount'))
request = youtube.channels().list(
part='contentDetails',
id=ChannelsAndVideos[i][0]
)
response = request.execute()
request = youtube.playlistItems().list(
part=['contentDetails','snippet','status'],
playlistId=response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
)
response = request.execute()
if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
sended.append(response['items'][0]['snippet']['resourceId']['videoId'])
for chat_id in telegram_channels_id:
try:
bot.send_message(chat_id, messageText + "https://www.youtube.com/watch?v=" + response['items'][0]['snippet']['resourceId']['videoId'])
except ...:
print("Не удалось отправить сообщение в " + str(chat_id))
它的实现有一个问题:每 30 秒它会请求每个频道的最新视频,所以这会占用 RAM 和其他可能的请求(它们的数量在 youtube 中是有限的 api).我该如何优化这个系统?
减少内存使用量
sended = []
...
if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
sended.append(response['items'][0]['snippet']['resourceId']['videoId'])
为避免发送的视频列表不断增加,请使用集合。它还会将 ['videoId'] in sended
的运行时复杂度从 O(n) 提高到 O(1)。
sended = set()
...
if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
sended.add(response['items'][0]['snippet']['resourceId']['videoId'])
速率限制
while(True):
time.sleep(sleepTime)
for i in range(len(ChannelsAndVideos)):
request = youtube.channels().list(
不是在迭代通道之前 sleep
大块,而是在每个请求到 api 之前 sleep
。为了便于使用,请创建一个辅助 request
函数,该函数将确保在下一个请求之前至少延迟 sleepTime
。
last_request_time = time.time()
def execute(request)
global last_request_time
next_request_time = last_request_time + sleepTime
time.sleep(max(next_request_time - time.time(), 0))
last_request_time = time.time()
return request.execute()
现在将所有 request.execute()
替换为 execute(request)
。根据 YouTube api 使用限制调整 sleepTime
值并删除所有 time.sleep(sleepTime)
.
结果
from distutils.log import error
from googleapiclient.discovery import build
import config
import time
import telebot
ChannelsAndVideos = []
Channels = config.channels_id
bot_token = config.bot_token
api_key = config.api_key
sleepTime = config.sleepTime
messageText = config.messageText
telegram_channels_id = config.telegram_channels_id
youtube = build('youtube', 'v3', developerKey=api_key)
bot = telebot.TeleBot(bot_token)
sended = set()
last_request_time = 0
def execute(request)
global last_request_time
next_request_time = last_request_time + sleepTime
time.sleep(max(next_request_time - time.time(), 0))
last_request_time = time.time()
return request.execute()
for ChannelId in Channels:
try:
request = youtube.channels().list(
part='statistics',
id=ChannelId
)
response = execute(request)
if(response['pageInfo']['totalResults'] != 1):
print("Error! Failed to add " + ChannelId)
else:
print(ChannelId + " has been added (" + response.get('items')[0].get('statistics').get('videoCount') + " video)")
ChannelsAndVideos.append((ChannelId, response.get('items')[0].get('statistics').get('videoCount')))
except error:
print(error.message)
while(True):
for i in range(len(ChannelsAndVideos)):
request = youtube.channels().list(
part='statistics',
id=ChannelsAndVideos[i][0]
)
response = execute(request)
if(response['pageInfo']['totalResults'] != 1):
print("Error!")
else:
if response.get('items')[0].get('statistics').get('videoCount') > ChannelsAndVideos[i][1]:
ChannelsAndVideos[i] = (ChannelsAndVideos[i][0], response.get('items')[0].get('statistics').get('videoCount'))
request = youtube.channels().list(
part='contentDetails',
id=ChannelsAndVideos[i][0]
)
response = execute(request)
request = youtube.playlistItems().list(
part=['contentDetails','snippet','status'],
playlistId=response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
)
response = execute(request)
if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
sended.add(response['items'][0]['snippet']['resourceId']['videoId'])
for chat_id in telegram_channels_id:
try:
bot.send_message(chat_id, messageText + "https://www.youtube.com/watch?v=" + response['items'][0]['snippet']['resourceId']['videoId'])
except ...:
print("Failed to send message to " + str(chat_id))
我制作了一个等待来自多个频道的视频并将新视频发送到电报的机器人。这是代码:
from distutils.log import error
from googleapiclient.discovery import build
import config
import time
import telebot
ChannelsAndVideos = []
Channels = config.channels_id
bot_token = config.bot_token
api_key = config.api_key
sleepTime = config.sleepTime
messageText = config.messageText
telegram_channels_id = config.telegram_channels_id
youtube = build('youtube', 'v3', developerKey=api_key)
bot = telebot.TeleBot(bot_token)
sended = []
for ChannelId in Channels:
try:
request = youtube.channels().list(
part='statistics',
id=ChannelId
)
response = request.execute()
if(response['pageInfo']['totalResults'] != 1):
print("Error! Не удалось добавить " + ChannelId)
else:
print(ChannelId + " был добавлен (" + response.get('items')[0].get('statistics').get('videoCount') + " видео)")
ChannelsAndVideos.append((ChannelId, response.get('items')[0].get('statistics').get('videoCount')))
except error:
print(error.message)
while(True):
time.sleep(sleepTime)
for i in range(len(ChannelsAndVideos)):
request = youtube.channels().list(
part='statistics',
id=ChannelsAndVideos[i][0]
)
response = request.execute()
if(response['pageInfo']['totalResults'] != 1):
print("Error!")
else:
if response.get('items')[0].get('statistics').get('videoCount') > ChannelsAndVideos[i][1]:
ChannelsAndVideos[i] = (ChannelsAndVideos[i][0], response.get('items')[0].get('statistics').get('videoCount'))
request = youtube.channels().list(
part='contentDetails',
id=ChannelsAndVideos[i][0]
)
response = request.execute()
request = youtube.playlistItems().list(
part=['contentDetails','snippet','status'],
playlistId=response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
)
response = request.execute()
if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
sended.append(response['items'][0]['snippet']['resourceId']['videoId'])
for chat_id in telegram_channels_id:
try:
bot.send_message(chat_id, messageText + "https://www.youtube.com/watch?v=" + response['items'][0]['snippet']['resourceId']['videoId'])
except ...:
print("Не удалось отправить сообщение в " + str(chat_id))
它的实现有一个问题:每 30 秒它会请求每个频道的最新视频,所以这会占用 RAM 和其他可能的请求(它们的数量在 youtube 中是有限的 api).我该如何优化这个系统?
减少内存使用量
sended = [] ... if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended): sended.append(response['items'][0]['snippet']['resourceId']['videoId'])
为避免发送的视频列表不断增加,请使用集合。它还会将 ['videoId'] in sended
的运行时复杂度从 O(n) 提高到 O(1)。
sended = set()
...
if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
sended.add(response['items'][0]['snippet']['resourceId']['videoId'])
速率限制
while(True): time.sleep(sleepTime) for i in range(len(ChannelsAndVideos)): request = youtube.channels().list(
不是在迭代通道之前 sleep
大块,而是在每个请求到 api 之前 sleep
。为了便于使用,请创建一个辅助 request
函数,该函数将确保在下一个请求之前至少延迟 sleepTime
。
last_request_time = time.time()
def execute(request)
global last_request_time
next_request_time = last_request_time + sleepTime
time.sleep(max(next_request_time - time.time(), 0))
last_request_time = time.time()
return request.execute()
现在将所有 request.execute()
替换为 execute(request)
。根据 YouTube api 使用限制调整 sleepTime
值并删除所有 time.sleep(sleepTime)
.
结果
from distutils.log import error
from googleapiclient.discovery import build
import config
import time
import telebot
ChannelsAndVideos = []
Channels = config.channels_id
bot_token = config.bot_token
api_key = config.api_key
sleepTime = config.sleepTime
messageText = config.messageText
telegram_channels_id = config.telegram_channels_id
youtube = build('youtube', 'v3', developerKey=api_key)
bot = telebot.TeleBot(bot_token)
sended = set()
last_request_time = 0
def execute(request)
global last_request_time
next_request_time = last_request_time + sleepTime
time.sleep(max(next_request_time - time.time(), 0))
last_request_time = time.time()
return request.execute()
for ChannelId in Channels:
try:
request = youtube.channels().list(
part='statistics',
id=ChannelId
)
response = execute(request)
if(response['pageInfo']['totalResults'] != 1):
print("Error! Failed to add " + ChannelId)
else:
print(ChannelId + " has been added (" + response.get('items')[0].get('statistics').get('videoCount') + " video)")
ChannelsAndVideos.append((ChannelId, response.get('items')[0].get('statistics').get('videoCount')))
except error:
print(error.message)
while(True):
for i in range(len(ChannelsAndVideos)):
request = youtube.channels().list(
part='statistics',
id=ChannelsAndVideos[i][0]
)
response = execute(request)
if(response['pageInfo']['totalResults'] != 1):
print("Error!")
else:
if response.get('items')[0].get('statistics').get('videoCount') > ChannelsAndVideos[i][1]:
ChannelsAndVideos[i] = (ChannelsAndVideos[i][0], response.get('items')[0].get('statistics').get('videoCount'))
request = youtube.channels().list(
part='contentDetails',
id=ChannelsAndVideos[i][0]
)
response = execute(request)
request = youtube.playlistItems().list(
part=['contentDetails','snippet','status'],
playlistId=response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
)
response = execute(request)
if not (response['items'][0]['snippet']['resourceId']['videoId'] in sended):
sended.add(response['items'][0]['snippet']['resourceId']['videoId'])
for chat_id in telegram_channels_id:
try:
bot.send_message(chat_id, messageText + "https://www.youtube.com/watch?v=" + response['items'][0]['snippet']['resourceId']['videoId'])
except ...:
print("Failed to send message to " + str(chat_id))