Создание демона с использованием Python libtorrent для выборки метаданных 100k+ торрентов

Я пытаюсь получить метаданные около 10 000+ торрентов в день, используя Python libtorrent.

Это текущий поток кода

  1. Запустите сеанс libtorrent.
  2. Получите общее количество торрентов, которые нам нужны для загрузки метаданных за последние 1 день.
  3. получить торрент-хэши из БД кусками
  4. создайте ссылку на магнит с помощью этих хешей и добавьте эти URI магнитов в сеанс, создав дескриптор для каждого URI магнита.
  5. спите секунду, пока метаданные извлекаются, и продолжайте проверять, найдены ли метаданные или нет.
  6. Если метаданные получены, добавьте их в БД, иначе проверьте, искали ли мы метаданные в течение 10 минут, если да, то удалите дескриптор, т.е. пока метаданные больше не ищите.
  7. делай выше бесконечно. и сохранить состояние сеанса на будущее.

до сих пор я пробовал это.

#!/usr/bin/env python
# this file will run as client or daemon and fetch torrent meta data i.e. torrent files from magnet uri

import libtorrent as lt # libtorrent library
import tempfile # for settings parameters while fetching metadata as temp dir
import sys #getting arguiments from shell or exit script
from time import sleep #sleep
import shutil # removing directory tree from temp directory 
import os.path # for getting pwd and other things
from pprint import pprint # for debugging, showing object data
import MySQLdb # DB connectivity 
import os
from datetime import date, timedelta

session = lt.session(lt.fingerprint("UT", 3, 4, 5, 0), flags=0)
session.listen_on(6881, 6891)
session.add_extension('ut_metadata')
session.add_extension('ut_pex')
session.add_extension('smart_ban')
session.add_extension('metadata_transfer')

session_save_filename = "/magnet2torrent/magnet_to_torrent_daemon.save_state"

if(os.path.isfile(session_save_filename)):

    fileread = open(session_save_filename, 'rb')
    session.load_state(lt.bdecode(fileread.read()))
    fileread.close()
    print('session loaded from file')
else:
    print('new session started')

session.add_dht_router("router.utorrent.com", 6881)
session.add_dht_router("router.bittorrent.com", 6881)
session.add_dht_router("dht.transmissionbt.com", 6881)
session.add_dht_router("dht.aelitis.com", 6881)

session.start_dht()
session.start_lsd()
session.start_upnp()
session.start_natpmp()

alive = True
while alive:

    db_conn = MySQLdb.connect(  host = '',  user = '',  passwd = '',    db = '',    unix_socket='/mysql/mysql.sock') # Open database connection
    #print('reconnecting')
    #get all records where enabled = 0 and uploaded within yesterday 
    subset_count = 100 ;

    yesterday = date.today() - timedelta(1)
    yesterday = yesterday.strftime('%Y-%m-%d %H:%M:%S')
    #print(yesterday)

    total_count_query = ("SELECT COUNT(*) as total_count FROM content WHERE upload_date > '"+ yesterday +"' AND enabled = '0' ")
    #print(total_count_query)
    try:
        total_count_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
        total_count_cursor.execute(total_count_query) # Execute the SQL command
        total_count_results = total_count_cursor.fetchone() # Fetch all the rows in a list of lists.
        total_count = total_count_results[0]
        print(total_count)
    except:
            print "Error: unable to select data"

    total_pages = total_count/subset_count
    #print(total_pages)

    current_page = 1
    while(current_page <= total_pages):
        from_count = (current_page * subset_count) - subset_count

        #print(current_page)
        #print(from_count)

        hashes = []

        get_mysql_data_query = ("SELECT hash FROM content WHERE upload_date > '" + yesterday +"' AND enabled = '0' ORDER BY record_num DESC LIMIT "+ str(from_count) +" , " + str(subset_count) +" ")
        #print(get_mysql_data_query)
        try:
            get_mysql_data_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
            get_mysql_data_cursor.execute(get_mysql_data_query) # Execute the SQL command
            get_mysql_data_results = get_mysql_data_cursor.fetchall() # Fetch all the rows in a list of lists.
            for row in get_mysql_data_results:
                hashes.append(row[0].upper())
        except:
            print "Error: unable to select data"

        #print(hashes)

        handles = []

        for hash in hashes:
            tempdir = tempfile.mkdtemp()
            add_magnet_uri_params = {
                'save_path': tempdir,
                'duplicate_is_error': True,
                'storage_mode': lt.storage_mode_t(2),
                'paused': False,
                'auto_managed': True,
                'duplicate_is_error': True
            }
            magnet_uri = "magnet:?xt=urn:btih:" + hash.upper() + "&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Ftracker.publicbt.com%3A80&tr=udp%3A%2F%2Ftracker.ccc.de%3A80"
            #print(magnet_uri)
            handle = lt.add_magnet_uri(session, magnet_uri, add_magnet_uri_params)
            handles.append(handle) #push handle in handles list

        #print("handles length is :")
        #print(len(handles))

        while(len(handles) != 0):
            for h in handles:
                #print("inside handles for each loop")
                if h.has_metadata():
                    torinfo = h.get_torrent_info()
                    final_info_hash = str(torinfo.info_hash())
                    final_info_hash = final_info_hash.upper()
                    torfile = lt.create_torrent(torinfo)
                    torcontent = lt.bencode(torfile.generate())
                    tfile_size = len(torcontent)
                    try:
                        insert_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
                        insert_cursor.execute("""INSERT INTO dht_tfiles (hash, tdata) VALUES (%s, %s)""",  [final_info_hash , torcontent] )
                        db_conn.commit()
                        #print "data inserted in DB"
                    except MySQLdb.Error, e:
                        try:
                            print "MySQL Error [%d]: %s" % (e.args[0], e.args[1])
                        except IndexError:
                            print "MySQL Error: %s" % str(e)    


                    shutil.rmtree(h.save_path())    #   remove temp data directory
                    session.remove_torrent(h) # remove torrnt handle from session   
                    handles.remove(h) #remove handle from list

                else:
                    if(h.status().active_time > 600):   # check if handle is more than 10 minutes old i.e. 600 seconds
                        #print('remove_torrent')
                        shutil.rmtree(h.save_path())    #   remove temp data directory
                        session.remove_torrent(h) # remove torrnt handle from session   
                        handles.remove(h) #remove handle from list
                sleep(1)        
                #print('sleep1')

        #print('sleep10')
        #sleep(10)
        current_page = current_page + 1

        #save session state
        filewrite = open(session_save_filename, "wb")
        filewrite.write(lt.bencode(session.save_state()))
        filewrite.close()


    print('sleep60')
    sleep(60)

    #save session state
    filewrite = open(session_save_filename, "wb")
    filewrite.write(lt.bencode(session.save_state()))
    filewrite.close()

Я пытался держать вышеупомянутый скрипт, работающий всю ночь, и нашел только около 1200 метаданных торрента, найденных в ночной сессии. поэтому я ищу улучшения производительности скрипта.

Я даже пытался расшифровать save_state файл и заметил там 700+ DHT nodes Я связан с так что это не так DHT не работает,

Что я планирую сделать, keep the handles active в сеансе бесконечно, пока метаданные не выбраны. и не собираюсь удалять дескрипторы через 10 минут, если метаданные не будут выбраны в течение 10 минут, как я сейчас это делаю.

У меня есть несколько вопросов относительно привязок lib-torrent python.

  1. Сколько ручек я могу продолжать работать? есть ли предел для бегущих ручек?
  2. замедлит ли моя система запуск дескрипторов 10k+ или 100k? или кушать ресурсы? если да то какие ресурсы? Я имею в виду RAM, NETWORK?
  3. Я нахожусь за брандмауэром, может ли заблокированный входящий порт вызывать медленную скорость извлечения метаданных?
  4. может ли сервер DHT, такой как router.bittorrent.com или любой другой BAN мой IP-адрес, отправлять слишком много запросов?
  5. Могут ли другие узлы ЗАПРЕЩАТЬ мой IP-адрес, если узнают, что я делаю слишком много запросов только для получения метаданных?
  6. я могу запустить несколько экземпляров этого скрипта? или может быть многопоточность? это даст лучшую производительность?
  7. если используется несколько экземпляров одного и того же скрипта, каждый скрипт получит уникальный идентификатор узла в зависимости от используемого ip и порта, это жизнеспособное решение?

Есть ли лучший подход? для достижения того, что я пытаюсь?

1 ответ

Решение

Я не могу ответить на вопросы, относящиеся к API libtorrent, но некоторые из ваших вопросов относятся к bittorrent в целом.

замедлит ли моя система запуск дескрипторов 10k+ или 100k? или кушать ресурсы? если да то какие ресурсы? я имею ввиду RAM, NETWORK?

Загрузки метаданных не должны использовать много ресурсов, поскольку они еще не являются полными торрент-загрузками, то есть они не могут размещать фактические файлы или что-то в этом роде. Но им понадобится некоторое место на оперативной памяти / диске для самих метаданных, как только они получат первую часть из них.

Я нахожусь за брандмауэром, может ли заблокированный входящий порт вызывать медленную скорость извлечения метаданных?

да, благодаря уменьшению количества одноранговых узлов, которые могут устанавливать соединения, становится труднее получать метаданные (или устанавливать какое-либо соединение вообще) для скоплений с низким числом одноранговых узлов.

NAT может вызвать ту же проблему.

может ли сервер DHT, такой как router.bittorrent.com или любой другой BAN мой IP-адрес, отправлять слишком много запросов?

router.bittorrent.com является узлом начальной загрузки, а не сервером как таковым. Поиски не запрашивают один узел, они запрашивают множество разных (среди миллионов). Но да, отдельные узлы могут запретить, или, более вероятно, ограничение скорости, вы.

Это может быть смягчено путем поиска случайно распределенных идентификаторов, чтобы распределить нагрузку по пространству клавиш DHT.

Могу ли я запустить несколько экземпляров этого скрипта? или может быть многопоточность? это даст лучшую производительность?

AIUI libtorrent достаточно неблокируемый или многопоточный, поэтому вы можете запланировать сразу несколько торрентов.

Я не знаю, есть ли у libtorrent ограничение скорости для исходящих запросов DHT.

если используется несколько экземпляров одного и того же скрипта, каждый скрипт получит уникальный идентификатор узла в зависимости от используемого ip и порта, это жизнеспособное решение?

Если вы имеете в виду идентификатор узла DHT, то они получаются из IP (согласно BEP 42), а не из порта. Несмотря на то, что включен некоторый случайный элемент, на IP может быть получено ограниченное количество идентификаторов.

И кое-что из этого также может быть применимо к вашему сценарию: http://blog.libtorrent.org/2012/01/seeding-a-million-torrents/

И еще один вариант - моя собственная реализация DHT, которая включает в себя интерфейс командной строки для массового извлечения торрентов.

Другие вопросы по тегам