使用 Python libtorrent 创建守护进程以获取 100k+ 种子的元数据

creating daemon using Python libtorrent for fetching meta data of 100k+ torrents

我正在尝试使用 python libtorrent 每天获取大约 10k+ 种子的元数据。

这是目前的代码流程

  1. 启动 libtorrent 会话。
  2. 获取过去 1 天内上传的我们需要元数据的种子总数。
  3. 从数据库中分块获取 torrent 哈希值
  4. 使用这些哈希值创建磁体 link,并通过为每个磁体 URI 创建句柄在会话中添加这些磁体 URI。
  5. 在获取元数据时休眠一秒钟,并继续检查是否找到元数据。
  6. 如果收到元数据,将其添加到数据库中,否则检查我们是否已经寻找元数据大约 10 分钟,如果是,则删除句柄,即暂时不再寻找元数据。
  7. 无限期地执行上述操作。并为将来保存会话状态。

到目前为止我已经试过了。

#!/usr/bin/env python
# this file will run as client or daemon and fetch torrent meta data i.e. torrent files from magnet uri

import libtorrent as lt # libtorrent library
import tempfile # for settings parameters while fetching metadata as temp dir
import sys #getting arguiments from shell or exit script
from time import sleep #sleep
import shutil # removing directory tree from temp directory 
import os.path # for getting pwd and other things
from pprint import pprint # for debugging, showing object data
import MySQLdb # DB connectivity 
import os
from datetime import date, timedelta

session = lt.session(lt.fingerprint("UT", 3, 4, 5, 0), flags=0)
session.listen_on(6881, 6891)
session.add_extension('ut_metadata')
session.add_extension('ut_pex')
session.add_extension('smart_ban')
session.add_extension('metadata_transfer')

session_save_filename = "/magnet2torrent/magnet_to_torrent_daemon.save_state"

if(os.path.isfile(session_save_filename)):

    fileread = open(session_save_filename, 'rb')
    session.load_state(lt.bdecode(fileread.read()))
    fileread.close()
    print('session loaded from file')
else:
    print('new session started')

session.add_dht_router("router.utorrent.com", 6881)
session.add_dht_router("router.bittorrent.com", 6881)
session.add_dht_router("dht.transmissionbt.com", 6881)
session.add_dht_router("dht.aelitis.com", 6881)

session.start_dht()
session.start_lsd()
session.start_upnp()
session.start_natpmp()

alive = True
while alive:

    db_conn = MySQLdb.connect(  host = '',  user = '',  passwd = '',    db = '',    unix_socket='/mysql/mysql.sock') # Open database connection
    #print('reconnecting')
    #get all records where enabled = 0 and uploaded within yesterday 
    subset_count = 100 ;

    yesterday = date.today() - timedelta(1)
    yesterday = yesterday.strftime('%Y-%m-%d %H:%M:%S')
    #print(yesterday)

    total_count_query = ("SELECT COUNT(*) as total_count FROM content WHERE upload_date > '"+ yesterday +"' AND enabled = '0' ")
    #print(total_count_query)
    try:
        total_count_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
        total_count_cursor.execute(total_count_query) # Execute the SQL command
        total_count_results = total_count_cursor.fetchone() # Fetch all the rows in a list of lists.
        total_count = total_count_results[0]
        print(total_count)
    except:
            print "Error: unable to select data"

    total_pages = total_count/subset_count
    #print(total_pages)

    current_page = 1
    while(current_page <= total_pages):
        from_count = (current_page * subset_count) - subset_count

        #print(current_page)
        #print(from_count)

        hashes = []

        get_mysql_data_query = ("SELECT hash FROM content WHERE upload_date > '" + yesterday +"' AND enabled = '0' ORDER BY record_num DESC LIMIT "+ str(from_count) +" , " + str(subset_count) +" ")
        #print(get_mysql_data_query)
        try:
            get_mysql_data_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
            get_mysql_data_cursor.execute(get_mysql_data_query) # Execute the SQL command
            get_mysql_data_results = get_mysql_data_cursor.fetchall() # Fetch all the rows in a list of lists.
            for row in get_mysql_data_results:
                hashes.append(row[0].upper())
        except:
            print "Error: unable to select data"

        #print(hashes)

        handles = []

        for hash in hashes:
            tempdir = tempfile.mkdtemp()
            add_magnet_uri_params = {
                'save_path': tempdir,
                'duplicate_is_error': True,
                'storage_mode': lt.storage_mode_t(2),
                'paused': False,
                'auto_managed': True,
                'duplicate_is_error': True
            }
            magnet_uri = "magnet:?xt=urn:btih:" + hash.upper() + "&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Ftracker.publicbt.com%3A80&tr=udp%3A%2F%2Ftracker.ccc.de%3A80"
            #print(magnet_uri)
            handle = lt.add_magnet_uri(session, magnet_uri, add_magnet_uri_params)
            handles.append(handle) #push handle in handles list

        #print("handles length is :")
        #print(len(handles))

        while(len(handles) != 0):
            for h in handles:
                #print("inside handles for each loop")
                if h.has_metadata():
                    torinfo = h.get_torrent_info()
                    final_info_hash = str(torinfo.info_hash())
                    final_info_hash = final_info_hash.upper()
                    torfile = lt.create_torrent(torinfo)
                    torcontent = lt.bencode(torfile.generate())
                    tfile_size = len(torcontent)
                    try:
                        insert_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
                        insert_cursor.execute("""INSERT INTO dht_tfiles (hash, tdata) VALUES (%s, %s)""",  [final_info_hash , torcontent] )
                        db_conn.commit()
                        #print "data inserted in DB"
                    except MySQLdb.Error, e:
                        try:
                            print "MySQL Error [%d]: %s" % (e.args[0], e.args[1])
                        except IndexError:
                            print "MySQL Error: %s" % str(e)    


                    shutil.rmtree(h.save_path())    #   remove temp data directory
                    session.remove_torrent(h) # remove torrnt handle from session   
                    handles.remove(h) #remove handle from list

                else:
                    if(h.status().active_time > 600):   # check if handle is more than 10 minutes old i.e. 600 seconds
                        #print('remove_torrent')
                        shutil.rmtree(h.save_path())    #   remove temp data directory
                        session.remove_torrent(h) # remove torrnt handle from session   
                        handles.remove(h) #remove handle from list
                sleep(1)        
                #print('sleep1')

        #print('sleep10')
        #sleep(10)
        current_page = current_page + 1

        #save session state
        filewrite = open(session_save_filename, "wb")
        filewrite.write(lt.bencode(session.save_state()))
        filewrite.close()


    print('sleep60')
    sleep(60)

    #save session state
    filewrite = open(session_save_filename, "wb")
    filewrite.write(lt.bencode(session.save_state()))
    filewrite.close()

我试着在脚本 运行ning 上过夜,发现在过夜的会话中只找到大约 1200 个 torrent 的元数据。 所以我正在寻找提高脚本性能的方法。

我什至尝试解码 save_state 文件并注意到我连接了 700 多个 DHT nodes。所以它不像 DHT 不是 运行ning,

我打算做的是,keep the handles active 在不获取元数据的情况下无限期地在会话中。如果在 10 分钟内没有获取元数据,则不会在 10 分钟后删除句柄,就像我目前正在做的那样。

我对 lib-torrent python 绑定有几个问题。

  1. 我可以保留多少句柄 运行ning ? 运行ning 句柄有限制吗?
  2. 运行10k+ 或 100k 句柄会降低我的系统速度吗?还是吃掉资源?如果是那么哪些资源?我是说内存,网络?
  3. 我在防火墙后面,会不会是传入端口被阻止导致元数据获取速度慢?
  4. DHT 服务器可以像 router.bittorrent.com 或任何其他 BAN 我的 ip 地址发送太多请求吗?
  5. 如果其他同行发现我发出的请求太多只是为了获取元数据,他们可以禁止我的 IP 地址吗?
  6. 我可以 运行 这个脚本的多个实例吗?或者可能是多线程?它会提供更好的性能吗?
  7. 如果使用同一脚本的多个实例,每个脚本将根据我使用的 ip 和端口获得唯一的节点 ID,这是可行的解决方案吗?

有没有更好的方法?为了实现我正在尝试的目标?

我无法回答特定于 libtorrent API 的问题,但您的一些问题通常适用于 bittorrent。

will running 10k+ or 100k handles slow down my system ? or eat up resources ? if yes then which resources ? i mean RAM , NETWORK ?

元数据下载不应该使用太多资源,因为它们还不是完整的 torrent 下载,即它们不能分配实际文件或类似的东西。但是一旦他们抓住了元数据的第一块,他们将需要一些 ram/disk space 作为元数据本身。

I am behind firewall , can be a blocked incoming port causing the slow speed of metadata fetching ?

是的,通过减少可以建立连接的对等点数量,在对等点数量较少的群中获取元数据(或根本建立任何连接)变得更加困难。

NAT 可能导致同样的问题。

can DHT server like router.bittorrent.com or any other BAN my ip address for sending too many requests ?

router.bittorrent.com 是一个 bootstrap 节点,而不是服务器本身。查找不查询单个节点,它们查询许多不同的(数百万个)。但是,是的,个别节点可以禁止,或者更可能是速率限制,你。

这可以通过寻找随机分布的 ID 来在 DHT 密钥上分散负载来缓解space。

can i run multiple instances of this script ? or may be multi-threading ? will it give better performance ?

AIUI libtorrent 是非阻塞或多线程的,您可以一次安排许多种子。

我不知道 libtorrent 是否对传出 DHT 请求有速率限制。

if using multiple instances of the same script, each script will get unique node-id depending on the ip and port i am using , is this viable solution ?

如果您指的是 DHT 节点 ID,那么它们是从 IP(根据 BEP 42)而非端口派生的。虽然包含了一些随机元素,但每个 IP 可以获得的 ID 数量有限。

其中一些可能也适用于您的场景:http://blog.libtorrent.org/2012/01/seeding-a-million-torrents/

另一个选项是 my own DHT implementation,它包含一个用于批量获取种子的 CLI。