Gevent / запросы зависает при выполнении большого количества запросов головы

Мне нужно сделать 100 000 запросов на головку, и я использую gevent поверх запросов. Мой код работает некоторое время, но затем в конечном итоге зависает. Я не уверен, почему он висит, или он висит внутри запросов или gevent. Я использую аргумент timeout внутри обоих запросов и gevent.

Пожалуйста, посмотрите на мой фрагмент кода ниже, и дайте мне знать, что я должен изменить.

import gevent
from gevent import monkey, pool
monkey.patch_all()
import requests

def get_head(url, timeout=3):
    try:
        return requests.head(url, allow_redirects=True, timeout=timeout)
    except:
        return None

def expand_short_urls(short_urls, chunk_size=100, timeout=60*5):
    chunk_list = lambda l, n: ( l[i:i+n] for i in range(0, len(l), n) )
    p = pool.Pool(chunk_size)
    print 'Expanding %d short_urls' % len(short_urls)
    results = {}
    for i, _short_urls_chunked in enumerate(chunk_list(short_urls, chunk_size)):
        print '\t%d. processing %d urls @ %s' % (i, chunk_size, str(datetime.datetime.now()))
        jobs = [p.spawn(get_head, _short_url) for _short_url in _short_urls_chunked]
        gevent.joinall(jobs, timeout=timeout)
        results.update({_short_url:job.get().url for _short_url, job in zip(_short_urls_chunked, jobs) if job.get() is not None and job.get().status_code==200})
    return results 

Я пробовал греквесты, но он был заброшен, и я прошел через запросы на получение github, но у них всех тоже есть проблемы.

2 ответа

Решение

Использование ОЗУ, которое вы наблюдаете, в основном зависит от всех данных, которые накапливаются при хранении 100000 объектов ответа, и от всех накладных расходов. Я воспроизвел вашу заявку и отправил HEAD-запросы против 15000 URL-адресов из топ-рейтинга Alexa. Это не имеет большого значения

  • использовал ли я пул gevent (т. е. один гринлет на соединение) или фиксированный набор гринлетов, причем все запрашивали несколько URL-адресов
  • насколько большой я установил размер пула

В конце концов, использование оперативной памяти значительно возросло. Тем не менее, я заметил, что изменение от requests в urllib2 уже привело к сокращению использования оперативной памяти, примерно в два раза. То есть я заменил

result = requests.head(url)

с

request = urllib2.Request(url)
request.get_method = lambda : 'HEAD'
result = urllib2.urlopen(request)

Несколько других советов: не используйте два механизма тайм-аута. Тайм-аут Gevent очень надежен, и вы можете легко использовать его следующим образом:

def gethead(url):
    result = None
    try:
        with Timeout(5, False):
            result = requests.head(url)
    except Exception as e:
        result = e
    return result

Может выглядеть сложно, но либо возвращается None (по истечении почти 5 секунд и указывает на тайм-аут), любой объект исключения, представляющий ошибку связи или ответ. Работает отлично!

Хотя это, вероятно, не является частью проблемы, в таких случаях я рекомендую поддерживать рабочих в живых и позволять им работать над несколькими предметами каждый! Накладные расходы на нерестовые гринлеты невелики. Тем не менее, это будет очень простое решение с набором долгоживущих гринлетов:

def qworker(qin, qout):
    while True:
        try:
            qout.put(gethead(qin.get(block=False)))
        except Empty:
            break

qin = Queue()
qout = Queue()

for url in urls:
    qin.put(url)

workers = [spawn(qworker, qin, qout) for i in xrange(POOLSIZE)]
joinall(workers)
returnvalues = [qout.get() for _ in xrange(len(urls))]

Кроме того, вы действительно должны понимать, что это крупномасштабная проблема, которую вы решаете там, приводя к нестандартным проблемам. Когда я воспроизвел ваш сценарий с тайм-аутом в 20 с и 100 рабочих и 15000 URL-адресов, которые нужно запросить, я легко получил большое количество сокетов:

# netstat -tpn | wc -l
10074

То есть ОС имела более 10000 сокетов для управления, большинство из них в состоянии TIME_WAIT. Я также наблюдал ошибки "Too many open files" и настраивал пределы с помощью sysctl. Когда вы запрашиваете 100000 URL-адресов, вы, вероятно, также достигнете таких пределов, и вам нужно будет принять меры, чтобы предотвратить голодание системы.

Также обратите внимание на то, как вы используете запросы, он автоматически выполняет перенаправления с HTTP на HTTPS и автоматически проверяет сертификат, который, безусловно, стоит оперативной памяти.

В моих измерениях, когда я делил количество запрошенных URL-адресов на время выполнения программы, я почти никогда не пропускал 100 ответов / с, что является результатом высокоскоростных соединений с иностранными серверами по всему миру. Я думаю, что на вас также влияет такой предел. Отрегулируйте остальную часть архитектуры до этого предела, и вы, вероятно, сможете генерировать поток данных из Интернета на диск (или базу данных) с небольшим использованием оперативной памяти между ними.

Я должен ответить на два ваших главных вопроса, а именно:

Я думаю, что gevent/ то, как вы используете это не ваша проблема. Я думаю, вы просто недооцениваете сложность своей задачи. Это сопровождается неприятными проблемами и доводит вашу систему до предела.

  • проблема использования оперативной памяти: начните с использования urllib2, если ты можешь. Затем, если вещи накапливаются все еще слишком высоко, вам нужно работать против накопления. Попробуйте создать устойчивое состояние: вы можете начать записывать данные на диск и, как правило, работать в направлении ситуации, когда объекты могут стать сборщиками мусора.

  • Ваш код "в конечном итоге зависает": возможно, это связано с проблемой ОЗУ. Если это не так, то не порождайте так много гринлетов, а используйте их по назначению. Кроме того, еще больше уменьшите параллелизм, контролируйте количество открытых сокетов, увеличьте системные ограничения при необходимости, и попытайтесь выяснить, где именно висит ваше программное обеспечение.

Я не уверен, что это решит вашу проблему, но вы не используете pool.Pool() правильно.

Попробуй это:

def expand_short_urls(short_urls, chunk_size=100):
    # Pool() automatically limits your process to chunk_size greenlets running concurrently
    # thus you don't need to do all that chunking business you were doing in your for loop
    p = pool.Pool(chunk_size)
    print 'Expanding %d short_urls' % len(short_urls)

    # spawn() (both gevent.spawn() and Pool.spawn()) returns a gevent.Greenlet object
    # NOT the value your function, get_head, will return
    threads = [p.spawn(get_head, short_url) for short_url in short_urls]
    p.join()

    # to access the returned value of your function, access the Greenlet.value property
    results = {short_url: thread.value.url for short_url, thread in zip(short_urls, threads) 

если thread.value не None и thread.value.status_code == 200} возвращает результаты

Другие вопросы по тегам