使用 Python libtorrent 创建守护进程以获取 100k+ 种子的元数据
creating daemon using Python libtorrent for fetching meta data of 100k+ torrents
我正在尝试使用 python libtorrent 每天获取大约 10k+ 种子的元数据。
这是目前的代码流程
- 启动 libtorrent 会话。
- 获取过去 1 天内上传的我们需要元数据的种子总数。
- 从数据库中分块获取 torrent 哈希值
- 使用这些哈希值创建磁体 link,并通过为每个磁体 URI 创建句柄在会话中添加这些磁体 URI。
- 在获取元数据时休眠一秒钟,并继续检查是否找到元数据。
- 如果收到元数据,将其添加到数据库中,否则检查我们是否已经寻找元数据大约 10 分钟,如果是,则删除句柄,即暂时不再寻找元数据。
- 无限期地执行上述操作。并为将来保存会话状态。
到目前为止我已经试过了。
#!/usr/bin/env python
# this file will run as client or daemon and fetch torrent meta data i.e. torrent files from magnet uri
import libtorrent as lt # libtorrent library
import tempfile # for settings parameters while fetching metadata as temp dir
import sys #getting arguiments from shell or exit script
from time import sleep #sleep
import shutil # removing directory tree from temp directory
import os.path # for getting pwd and other things
from pprint import pprint # for debugging, showing object data
import MySQLdb # DB connectivity
import os
from datetime import date, timedelta
session = lt.session(lt.fingerprint("UT", 3, 4, 5, 0), flags=0)
session.listen_on(6881, 6891)
session.add_extension('ut_metadata')
session.add_extension('ut_pex')
session.add_extension('smart_ban')
session.add_extension('metadata_transfer')
session_save_filename = "/magnet2torrent/magnet_to_torrent_daemon.save_state"
if(os.path.isfile(session_save_filename)):
fileread = open(session_save_filename, 'rb')
session.load_state(lt.bdecode(fileread.read()))
fileread.close()
print('session loaded from file')
else:
print('new session started')
session.add_dht_router("router.utorrent.com", 6881)
session.add_dht_router("router.bittorrent.com", 6881)
session.add_dht_router("dht.transmissionbt.com", 6881)
session.add_dht_router("dht.aelitis.com", 6881)
session.start_dht()
session.start_lsd()
session.start_upnp()
session.start_natpmp()
alive = True
while alive:
db_conn = MySQLdb.connect( host = '', user = '', passwd = '', db = '', unix_socket='/mysql/mysql.sock') # Open database connection
#print('reconnecting')
#get all records where enabled = 0 and uploaded within yesterday
subset_count = 100 ;
yesterday = date.today() - timedelta(1)
yesterday = yesterday.strftime('%Y-%m-%d %H:%M:%S')
#print(yesterday)
total_count_query = ("SELECT COUNT(*) as total_count FROM content WHERE upload_date > '"+ yesterday +"' AND enabled = '0' ")
#print(total_count_query)
try:
total_count_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
total_count_cursor.execute(total_count_query) # Execute the SQL command
total_count_results = total_count_cursor.fetchone() # Fetch all the rows in a list of lists.
total_count = total_count_results[0]
print(total_count)
except:
print "Error: unable to select data"
total_pages = total_count/subset_count
#print(total_pages)
current_page = 1
while(current_page <= total_pages):
from_count = (current_page * subset_count) - subset_count
#print(current_page)
#print(from_count)
hashes = []
get_mysql_data_query = ("SELECT hash FROM content WHERE upload_date > '" + yesterday +"' AND enabled = '0' ORDER BY record_num DESC LIMIT "+ str(from_count) +" , " + str(subset_count) +" ")
#print(get_mysql_data_query)
try:
get_mysql_data_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
get_mysql_data_cursor.execute(get_mysql_data_query) # Execute the SQL command
get_mysql_data_results = get_mysql_data_cursor.fetchall() # Fetch all the rows in a list of lists.
for row in get_mysql_data_results:
hashes.append(row[0].upper())
except:
print "Error: unable to select data"
#print(hashes)
handles = []
for hash in hashes:
tempdir = tempfile.mkdtemp()
add_magnet_uri_params = {
'save_path': tempdir,
'duplicate_is_error': True,
'storage_mode': lt.storage_mode_t(2),
'paused': False,
'auto_managed': True,
'duplicate_is_error': True
}
magnet_uri = "magnet:?xt=urn:btih:" + hash.upper() + "&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Ftracker.publicbt.com%3A80&tr=udp%3A%2F%2Ftracker.ccc.de%3A80"
#print(magnet_uri)
handle = lt.add_magnet_uri(session, magnet_uri, add_magnet_uri_params)
handles.append(handle) #push handle in handles list
#print("handles length is :")
#print(len(handles))
while(len(handles) != 0):
for h in handles:
#print("inside handles for each loop")
if h.has_metadata():
torinfo = h.get_torrent_info()
final_info_hash = str(torinfo.info_hash())
final_info_hash = final_info_hash.upper()
torfile = lt.create_torrent(torinfo)
torcontent = lt.bencode(torfile.generate())
tfile_size = len(torcontent)
try:
insert_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
insert_cursor.execute("""INSERT INTO dht_tfiles (hash, tdata) VALUES (%s, %s)""", [final_info_hash , torcontent] )
db_conn.commit()
#print "data inserted in DB"
except MySQLdb.Error, e:
try:
print "MySQL Error [%d]: %s" % (e.args[0], e.args[1])
except IndexError:
print "MySQL Error: %s" % str(e)
shutil.rmtree(h.save_path()) # remove temp data directory
session.remove_torrent(h) # remove torrnt handle from session
handles.remove(h) #remove handle from list
else:
if(h.status().active_time > 600): # check if handle is more than 10 minutes old i.e. 600 seconds
#print('remove_torrent')
shutil.rmtree(h.save_path()) # remove temp data directory
session.remove_torrent(h) # remove torrnt handle from session
handles.remove(h) #remove handle from list
sleep(1)
#print('sleep1')
#print('sleep10')
#sleep(10)
current_page = current_page + 1
#save session state
filewrite = open(session_save_filename, "wb")
filewrite.write(lt.bencode(session.save_state()))
filewrite.close()
print('sleep60')
sleep(60)
#save session state
filewrite = open(session_save_filename, "wb")
filewrite.write(lt.bencode(session.save_state()))
filewrite.close()
我试着在脚本 运行ning 上过夜,发现在过夜的会话中只找到大约 1200 个 torrent 的元数据。
所以我正在寻找提高脚本性能的方法。
我什至尝试解码 save_state
文件并注意到我连接了 700 多个 DHT nodes
。所以它不像 DHT
不是 运行ning,
我打算做的是,keep the handles active
在不获取元数据的情况下无限期地在会话中。如果在 10 分钟内没有获取元数据,则不会在 10 分钟后删除句柄,就像我目前正在做的那样。
我对 lib-torrent python 绑定有几个问题。
- 我可以保留多少句柄 运行ning ? 运行ning 句柄有限制吗?
- 运行10k+ 或 100k 句柄会降低我的系统速度吗?还是吃掉资源?如果是那么哪些资源?我是说内存,网络?
- 我在防火墙后面,会不会是传入端口被阻止导致元数据获取速度慢?
- DHT 服务器可以像 router.bittorrent.com 或任何其他 BAN 我的 ip 地址发送太多请求吗?
- 如果其他同行发现我发出的请求太多只是为了获取元数据,他们可以禁止我的 IP 地址吗?
- 我可以 运行 这个脚本的多个实例吗?或者可能是多线程?它会提供更好的性能吗?
- 如果使用同一脚本的多个实例,每个脚本将根据我使用的 ip 和端口获得唯一的节点 ID,这是可行的解决方案吗?
有没有更好的方法?为了实现我正在尝试的目标?
我无法回答特定于 libtorrent API 的问题,但您的一些问题通常适用于 bittorrent。
will running 10k+ or 100k handles slow down my system ? or eat up resources ? if yes then which resources ? i mean RAM , NETWORK ?
元数据下载不应该使用太多资源,因为它们还不是完整的 torrent 下载,即它们不能分配实际文件或类似的东西。但是一旦他们抓住了元数据的第一块,他们将需要一些 ram/disk space 作为元数据本身。
I am behind firewall , can be a blocked incoming port causing the slow speed of metadata fetching ?
是的,通过减少可以建立连接的对等点数量,在对等点数量较少的群中获取元数据(或根本建立任何连接)变得更加困难。
NAT 可能导致同样的问题。
can DHT server like router.bittorrent.com or any other BAN my ip address for sending too many requests ?
router.bittorrent.com 是一个 bootstrap 节点,而不是服务器本身。查找不查询单个节点,它们查询许多不同的(数百万个)。但是,是的,个别节点可以禁止,或者更可能是速率限制,你。
这可以通过寻找随机分布的 ID 来在 DHT 密钥上分散负载来缓解space。
can i run multiple instances of this script ? or may be multi-threading ? will it give better performance ?
AIUI libtorrent 是非阻塞或多线程的,您可以一次安排许多种子。
我不知道 libtorrent 是否对传出 DHT 请求有速率限制。
if using multiple instances of the same script, each script will get unique node-id depending on the ip and port i am using , is this viable solution ?
如果您指的是 DHT 节点 ID,那么它们是从 IP(根据 BEP 42)而非端口派生的。虽然包含了一些随机元素,但每个 IP 可以获得的 ID 数量有限。
其中一些可能也适用于您的场景:http://blog.libtorrent.org/2012/01/seeding-a-million-torrents/
另一个选项是 my own DHT implementation,它包含一个用于批量获取种子的 CLI。
我正在尝试使用 python libtorrent 每天获取大约 10k+ 种子的元数据。
这是目前的代码流程
- 启动 libtorrent 会话。
- 获取过去 1 天内上传的我们需要元数据的种子总数。
- 从数据库中分块获取 torrent 哈希值
- 使用这些哈希值创建磁体 link,并通过为每个磁体 URI 创建句柄在会话中添加这些磁体 URI。
- 在获取元数据时休眠一秒钟,并继续检查是否找到元数据。
- 如果收到元数据,将其添加到数据库中,否则检查我们是否已经寻找元数据大约 10 分钟,如果是,则删除句柄,即暂时不再寻找元数据。
- 无限期地执行上述操作。并为将来保存会话状态。
到目前为止我已经试过了。
#!/usr/bin/env python
# this file will run as client or daemon and fetch torrent meta data i.e. torrent files from magnet uri
import libtorrent as lt # libtorrent library
import tempfile # for settings parameters while fetching metadata as temp dir
import sys #getting arguiments from shell or exit script
from time import sleep #sleep
import shutil # removing directory tree from temp directory
import os.path # for getting pwd and other things
from pprint import pprint # for debugging, showing object data
import MySQLdb # DB connectivity
import os
from datetime import date, timedelta
session = lt.session(lt.fingerprint("UT", 3, 4, 5, 0), flags=0)
session.listen_on(6881, 6891)
session.add_extension('ut_metadata')
session.add_extension('ut_pex')
session.add_extension('smart_ban')
session.add_extension('metadata_transfer')
session_save_filename = "/magnet2torrent/magnet_to_torrent_daemon.save_state"
if(os.path.isfile(session_save_filename)):
fileread = open(session_save_filename, 'rb')
session.load_state(lt.bdecode(fileread.read()))
fileread.close()
print('session loaded from file')
else:
print('new session started')
session.add_dht_router("router.utorrent.com", 6881)
session.add_dht_router("router.bittorrent.com", 6881)
session.add_dht_router("dht.transmissionbt.com", 6881)
session.add_dht_router("dht.aelitis.com", 6881)
session.start_dht()
session.start_lsd()
session.start_upnp()
session.start_natpmp()
alive = True
while alive:
db_conn = MySQLdb.connect( host = '', user = '', passwd = '', db = '', unix_socket='/mysql/mysql.sock') # Open database connection
#print('reconnecting')
#get all records where enabled = 0 and uploaded within yesterday
subset_count = 100 ;
yesterday = date.today() - timedelta(1)
yesterday = yesterday.strftime('%Y-%m-%d %H:%M:%S')
#print(yesterday)
total_count_query = ("SELECT COUNT(*) as total_count FROM content WHERE upload_date > '"+ yesterday +"' AND enabled = '0' ")
#print(total_count_query)
try:
total_count_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
total_count_cursor.execute(total_count_query) # Execute the SQL command
total_count_results = total_count_cursor.fetchone() # Fetch all the rows in a list of lists.
total_count = total_count_results[0]
print(total_count)
except:
print "Error: unable to select data"
total_pages = total_count/subset_count
#print(total_pages)
current_page = 1
while(current_page <= total_pages):
from_count = (current_page * subset_count) - subset_count
#print(current_page)
#print(from_count)
hashes = []
get_mysql_data_query = ("SELECT hash FROM content WHERE upload_date > '" + yesterday +"' AND enabled = '0' ORDER BY record_num DESC LIMIT "+ str(from_count) +" , " + str(subset_count) +" ")
#print(get_mysql_data_query)
try:
get_mysql_data_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
get_mysql_data_cursor.execute(get_mysql_data_query) # Execute the SQL command
get_mysql_data_results = get_mysql_data_cursor.fetchall() # Fetch all the rows in a list of lists.
for row in get_mysql_data_results:
hashes.append(row[0].upper())
except:
print "Error: unable to select data"
#print(hashes)
handles = []
for hash in hashes:
tempdir = tempfile.mkdtemp()
add_magnet_uri_params = {
'save_path': tempdir,
'duplicate_is_error': True,
'storage_mode': lt.storage_mode_t(2),
'paused': False,
'auto_managed': True,
'duplicate_is_error': True
}
magnet_uri = "magnet:?xt=urn:btih:" + hash.upper() + "&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Ftracker.publicbt.com%3A80&tr=udp%3A%2F%2Ftracker.ccc.de%3A80"
#print(magnet_uri)
handle = lt.add_magnet_uri(session, magnet_uri, add_magnet_uri_params)
handles.append(handle) #push handle in handles list
#print("handles length is :")
#print(len(handles))
while(len(handles) != 0):
for h in handles:
#print("inside handles for each loop")
if h.has_metadata():
torinfo = h.get_torrent_info()
final_info_hash = str(torinfo.info_hash())
final_info_hash = final_info_hash.upper()
torfile = lt.create_torrent(torinfo)
torcontent = lt.bencode(torfile.generate())
tfile_size = len(torcontent)
try:
insert_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
insert_cursor.execute("""INSERT INTO dht_tfiles (hash, tdata) VALUES (%s, %s)""", [final_info_hash , torcontent] )
db_conn.commit()
#print "data inserted in DB"
except MySQLdb.Error, e:
try:
print "MySQL Error [%d]: %s" % (e.args[0], e.args[1])
except IndexError:
print "MySQL Error: %s" % str(e)
shutil.rmtree(h.save_path()) # remove temp data directory
session.remove_torrent(h) # remove torrnt handle from session
handles.remove(h) #remove handle from list
else:
if(h.status().active_time > 600): # check if handle is more than 10 minutes old i.e. 600 seconds
#print('remove_torrent')
shutil.rmtree(h.save_path()) # remove temp data directory
session.remove_torrent(h) # remove torrnt handle from session
handles.remove(h) #remove handle from list
sleep(1)
#print('sleep1')
#print('sleep10')
#sleep(10)
current_page = current_page + 1
#save session state
filewrite = open(session_save_filename, "wb")
filewrite.write(lt.bencode(session.save_state()))
filewrite.close()
print('sleep60')
sleep(60)
#save session state
filewrite = open(session_save_filename, "wb")
filewrite.write(lt.bencode(session.save_state()))
filewrite.close()
我试着在脚本 运行ning 上过夜,发现在过夜的会话中只找到大约 1200 个 torrent 的元数据。 所以我正在寻找提高脚本性能的方法。
我什至尝试解码 save_state
文件并注意到我连接了 700 多个 DHT nodes
。所以它不像 DHT
不是 运行ning,
我打算做的是,keep the handles active
在不获取元数据的情况下无限期地在会话中。如果在 10 分钟内没有获取元数据,则不会在 10 分钟后删除句柄,就像我目前正在做的那样。
我对 lib-torrent python 绑定有几个问题。
- 我可以保留多少句柄 运行ning ? 运行ning 句柄有限制吗?
- 运行10k+ 或 100k 句柄会降低我的系统速度吗?还是吃掉资源?如果是那么哪些资源?我是说内存,网络?
- 我在防火墙后面,会不会是传入端口被阻止导致元数据获取速度慢?
- DHT 服务器可以像 router.bittorrent.com 或任何其他 BAN 我的 ip 地址发送太多请求吗?
- 如果其他同行发现我发出的请求太多只是为了获取元数据,他们可以禁止我的 IP 地址吗?
- 我可以 运行 这个脚本的多个实例吗?或者可能是多线程?它会提供更好的性能吗?
- 如果使用同一脚本的多个实例,每个脚本将根据我使用的 ip 和端口获得唯一的节点 ID,这是可行的解决方案吗?
有没有更好的方法?为了实现我正在尝试的目标?
我无法回答特定于 libtorrent API 的问题,但您的一些问题通常适用于 bittorrent。
will running 10k+ or 100k handles slow down my system ? or eat up resources ? if yes then which resources ? i mean RAM , NETWORK ?
元数据下载不应该使用太多资源,因为它们还不是完整的 torrent 下载,即它们不能分配实际文件或类似的东西。但是一旦他们抓住了元数据的第一块,他们将需要一些 ram/disk space 作为元数据本身。
I am behind firewall , can be a blocked incoming port causing the slow speed of metadata fetching ?
是的,通过减少可以建立连接的对等点数量,在对等点数量较少的群中获取元数据(或根本建立任何连接)变得更加困难。
NAT 可能导致同样的问题。
can DHT server like router.bittorrent.com or any other BAN my ip address for sending too many requests ?
router.bittorrent.com 是一个 bootstrap 节点,而不是服务器本身。查找不查询单个节点,它们查询许多不同的(数百万个)。但是,是的,个别节点可以禁止,或者更可能是速率限制,你。
这可以通过寻找随机分布的 ID 来在 DHT 密钥上分散负载来缓解space。
can i run multiple instances of this script ? or may be multi-threading ? will it give better performance ?
AIUI libtorrent 是非阻塞或多线程的,您可以一次安排许多种子。
我不知道 libtorrent 是否对传出 DHT 请求有速率限制。
if using multiple instances of the same script, each script will get unique node-id depending on the ip and port i am using , is this viable solution ?
如果您指的是 DHT 节点 ID,那么它们是从 IP(根据 BEP 42)而非端口派生的。虽然包含了一些随机元素,但每个 IP 可以获得的 ID 数量有限。
其中一些可能也适用于您的场景:http://blog.libtorrent.org/2012/01/seeding-a-million-torrents/
另一个选项是 my own DHT implementation,它包含一个用于批量获取种子的 CLI。