Медленные операции с базой данных django над большим (ish) набором данных.

Я настроил систему для фильтрации образца потока в реальном времени. Очевидно, что запись в базу данных слишком медленная, чтобы справиться с чем-то более сложным, чем пара ключевых слов с малым объемом. Я реализовал django-rq как простую систему очередей, чтобы выталкивать твиты в очередь на основе Redis по мере их поступления, и это прекрасно работает. Моя проблема на другой стороне. В контексте этого вопроса у меня есть система, которая работает прямо сейчас, с 1,5 млн твитов для анализа и еще 375 000 в очереди через redis. При нынешних показателях производительности мне понадобится ~3 дня, чтобы наверстать упущенное, если я отключу потоки, чего я не хочу. Если я буду поддерживать потоки, то, по моим последним оценкам, это займет около месяца.

В базе данных есть пара миллионов строк в двух основных таблицах, и записи выполняются очень медленно. Оптимальное число rq-рабочих, по-видимому, равно четырем, и это в среднем составляет 1,6 очереди задач в секунду. (Код того, что ставится в очередь ниже). Я подумал, что, возможно, проблема заключается в открытии соединений с БД для каждой новой задачи очереди, поэтому установите значение CONN_MAX_AGE в 60, но это ничего не улучшило.

Только что протестировав это на локальном хосте, я получил более 13 записей в секунду на Macbook 2011 с запущенным Chrome и т. Д., Но в этой базе данных всего несколько тысяч строк, что позволяет мне полагать, что это связано с размером. Есть пара get_or_create Команды, которые я использую (см. ниже), которые могут замедлять работу, но не могут видеть другие способы их использования - мне нужно проверить, существует ли пользователь, и мне нужно проверить, существует ли твит (я мог бы, я подозреваю, перевести последний в попытку / за исключением того, что твиты, поступающие из прямой трансляции, уже не должны существовать по понятным причинам.) Получу ли я от этого прирост производительности? Поскольку это все еще продолжается, я стремлюсь немного оптимизировать код и привлечь туда более быстрых / более эффективных работников, чтобы я мог наверстать упущенное! Будет ли запускать предварительную проверку работника, чтобы пакетировать вещи на работу? (то есть, чтобы я мог создавать группы пользователей, которых не существует, или что-то подобное?)

Я использую 4-ядерную /8-гигабайтную оперативную память Ram в цифровом океане, так что чувствую, что это довольно ужасная производительность, и предположительно связанная с кодом. Где я здесь ошибаюсь?
(Я разместил это здесь, а не обзор кода, поскольку я думаю, что это относится к формату вопросов и ответов для SO, поскольку я пытаюсь решить конкретную проблему с кодом, а не "как я могу сделать это в целом лучше?")

Примечание: я работаю в django 1.6, так как это код, который я использовал в течение некоторого времени и не был уверен в том, что обновлять в то время - это не общедоступное решение, так что если нет веской причины сейчас (как это проблема производительности), я не собирался обновляться (для этого проекта).

Слушатель потока:

class StdOutListener(tweepy.StreamListener):
            def on_data(self, data):
                # Twitter returns data in JSON format - we need to decode it first
                decoded = json.loads(data)
                #print type(decoded), decoded
                # Also, we convert UTF-8 to ASCII ignoring all bad characters sent by users
                try:
                    if decoded['lang'] == 'en':
                        django_rq.enqueue(read_both, decoded)
                    else:
                        pass
                except KeyError,e:
                    print "Error on Key", e
                except DataError, e:
                    print "DataError", e
                return True

            def on_error(self, status):
                print status

Читать пользователя / чирикать / оба

def read_user(tweet):
    from harvester.models import User
    from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
    #We might get weird results where user has changed their details"], so first we check the UID.
    #print "MULTIPLE USER DEBUG", tweet["user"]["id_str"]
    try:
        current_user = User.objects.get(id_str=tweet["user"]["id_str"])
        created=False
        return current_user, created
    except ObjectDoesNotExist:
        pass
    except MultipleObjectsReturned:
        current_user = User.objects.filter(id_str=tweet["user"]["id_str"])[0]
        return current_user, False
    if not tweet["user"]["follow_request_sent"]:
        tweet["user"]["follow_request_sent"] = False
    if not tweet["user"]["following"]:
        tweet["user"]["following"] = False
    if not tweet["user"]["description"]:
        tweet["user"]["description"] = " "
    if not tweet["user"]["notifications"]:
        tweet["user"]["notifications"] = False

    #If that doesn't work"], then we'll use get_or_create (as a failback rather than save())
    from dateutil.parser import parse
    if not tweet["user"]["contributors_enabled"]:
        current_user, created = User.objects.get_or_create(
            follow_request_sent=tweet["user"]["follow_request_sent"],
            _json = {},
            verified = tweet["user"]["verified"],
            followers_count = tweet["user"]["followers_count"],
            profile_image_url_https = tweet["user"]["profile_image_url_https"],
            id_str = tweet["user"]["id_str"],
            listed_count = tweet["user"]["listed_count"],
            utc_offset = tweet["user"]["utc_offset"],
            statuses_count = tweet["user"]["statuses_count"],
            description = tweet["user"]["description"],
            friends_count = tweet["user"]["friends_count"],
            location = tweet["user"]["location"],
            profile_image_url= tweet["user"]["profile_image_url"],
            following = tweet["user"]["following"],
            geo_enabled = tweet["user"]["geo_enabled"],
            profile_background_image_url =tweet["user"]["profile_background_image_url"],
            screen_name = tweet["user"]["screen_name"],
            lang =  tweet["user"]["lang"],
            profile_background_tile = tweet["user"]["profile_background_tile"],
            favourites_count = tweet["user"]["favourites_count"],
            name = tweet["user"]["name"],
            notifications = tweet["user"]["notifications"],
            url = tweet["user"]["url"],
            created_at = parse(tweet["user"]["created_at"]),
            contributors_enabled = False,
            time_zone = tweet["user"]["time_zone"],
            protected = tweet["user"]["protected"],
            default_profile = tweet["user"]["default_profile"],
            is_translator = tweet["user"]["is_translator"]
        )
    else:
        current_user, created = User.objects.get_or_create(
            follow_request_sent=tweet["user"]["follow_request_sent"],
            _json = {},
            verified = tweet["user"]["verified"],
            followers_count = tweet["user"]["followers_count"],
            profile_image_url_https = tweet["user"]["profile_image_url_https"],
            id_str = tweet["user"]["id_str"],
            listed_count = tweet["user"]["listed_count"],
            utc_offset = tweet["user"]["utc_offset"],
            statuses_count = tweet["user"]["statuses_count"],
            description = tweet["user"]["description"],
            friends_count = tweet["user"]["friends_count"],
            location = tweet["user"]["location"],
            profile_image_url= tweet["user"]["profile_image_url"],
            following = tweet["user"]["following"],
            geo_enabled = tweet["user"]["geo_enabled"],
            profile_background_image_url =tweet["user"]["profile_background_image_url"],
            screen_name = tweet["user"]["screen_name"],
            lang =  tweet["user"]["lang"],
            profile_background_tile = tweet["user"]["profile_background_tile"],
            favourites_count = tweet["user"]["favourites_count"],
            name = tweet["user"]["name"],
            notifications = tweet["user"]["notifications"],
            url = tweet["user"]["url"],
            created_at = parse(tweet["user"]["created_at"]),
            contributors_enabled = tweet["user"]["contributers_enabled"],
            time_zone = tweet["user"]["time_zone"],
            protected = tweet["user"]["protected"],
            default_profile = tweet["user"]["default_profile"],
            is_translator = tweet["user"]["is_translator"]
        )
    #print "CURRENT USER:""], type(current_user)"], current_user
    #current_user"], created = User.objects.get_or_create(current_user)
    return current_user, created

def read_tweet(tweet, current_user):
    import logging
    logger = logging.getLogger('django')
    from datetime import date, datetime
    #print "Inside read_Tweet"
    from harvester.models import Tweet
    from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
    from django.db import DataError
    #We might get weird results where user has changed their details"], so first we check the UID.
    #print tweet_data["created_at"]
    from dateutil.parser import parse
    tweet["created_at"] = parse(tweet["created_at"])
    try:
        #print "trying tweet_data["id"
        current_tweet =Tweet.objects.get(id_str=tweet["id_str"])
        created=False
        return current_user, created
    except ObjectDoesNotExist:
        pass
    except MultipleObjectsReturned:
        current_tweet =Tweet.objects.filter(id_str=tweet["id_str"])[0]
    try:
        current_tweet, created = Tweet.objects.get_or_create(
        truncated=tweet["truncated"],
        text=tweet["text"],
        favorite_count=tweet["favorite_count"],
        author = current_user,
        _json = {},
        source=tweet["source"],
        retweeted=tweet["retweeted"],
        coordinates = tweet["coordinates"],
        entities = tweet["entities"],
        in_reply_to_screen_name = tweet["in_reply_to_screen_name"],
        id_str = tweet["id_str"],
        retweet_count = tweet["retweet_count"],
        favorited = tweet["favorited"],
        user = tweet["user"],
        geo = tweet["geo"],
        in_reply_to_user_id_str = tweet["in_reply_to_user_id_str"],
        lang = tweet["lang"],
        created_at = tweet["created_at"],
        place = tweet["place"])
        print "DEBUG", current_user, current_tweet
        return current_tweet, created
    except DataError, e:
        #Catchall to pick up non-parsed tweets
        print "DEBUG ERROR", e, tweet
        return None, False

def read_both(tweet):
    current_user, created = read_user(tweet)
    current_tweet, created = read_tweet(tweet, current_user)

1 ответ

Решение

В конце концов мне удалось собрать воедино ответ от некоторых редакторов и еще пару вещей.

По сути, хотя я делал двойной поиск в поле id_str, которое не было проиндексировано. Я добавил индексы db_index=True в этом поле на обоих read_tweet а также read_userи перешел читать твит в попытку / кроме Tweet.objects.create подход, возвращаясь к get_or_create, если есть проблема, и увидел увеличение скорости в 50-60 раз, при этом рабочие теперь масштабируемы - если я добавлю 10 рабочих, я получу 10-кратную скорость.

В настоящее время у меня есть один работник, который успешно обрабатывает около 6 твитов в секунду. Далее я добавлю демон мониторинга, чтобы проверить размер очереди и добавить дополнительных работников, если он все еще увеличивается.

tl; dr - ПОМНИТЕ УКАЗАТЬ!

Другие вопросы по тегам