Критический раздел конвейера Scrapy mysql в runInteraction()

Мне нужно помочь с исправлением критической секции в моем коде конвейерной проверки.

Я использую этот конвейер MySQL в scrapy (с http://snippets.scrapy.org/snippets/33/):

class SQLStorePipeline(object):

def __init__(self):
    self.dbpool = adbapi.ConnectionPool('MySQLdb', db='mydb',
            user='myuser', passwd='mypass', cursorclass=MySQLdb.cursors.DictCursor,
            charset='utf8', use_unicode=True)

def process_item(self, item, spider):
    # run db query in thread pool
    query = self.dbpool.runInteraction(self._conditional_insert, item)
    query.addErrback(self.handle_error)

    return item

def _conditional_insert(self, tx, item):
    # create record if doesn't exist. 
    # all this block run on it's own thread


    # START CRITICAL SECTION
    some_critical_code_here
    # STOP CRITICAL SECTION


    tx.execute("select * from websites where link = %s", (item['link'][0], ))
    result = tx.fetchone()
    if result:
        log.msg("Item already stored in db: %s" % item, level=log.DEBUG)
    else:
        tx.execute(\
            "insert into websites (link, created) "
            "values (%s, %s)",
            (item['link'][0],
             datetime.datetime.now())
        )
        log.msg("Item stored in db: %s" % item, level=log.DEBUG)

def handle_error(self, e):
    log.err(e)

Все работает просто отлично.

Как вы можете видеть, я уже знаю, где находится мой критический раздел в коде. Но я действительно новичок в python и не знаю, как использовать некоторые блокировки или что-то в этом роде, чтобы предотвратить попадание в секцию сечения большего количества потоков, чем один.

Не могли бы вы мне помочь? Если вы можете отправить мне код для входа и выхода из критического раздела, который я могу использовать в этом коде, это будет здорово.

Спасибо ребята.

2 ответа

Решение

Как бы то ни было, я разбираюсь с этим, объединяя SQL-статистику в критическом разделе thx с парнем, у которого есть Ник Trotrot, в Scrapy IRC за идею.

Даже если вы используете Twisted, где обычно все, что связано с блокировкой, нужно делать по-другому, вы находитесь в определенной части Twisted, где блокировка в порядке. Так что это должно быть так же просто, как выделить объект Lock, на который все потоки смогут ссылаться, а затем получить его:

import threading

insert_critical_lock = threading.Lock()

...

def _conditional_insert(self, tx, item):

    with insert_critical_lock:
        # START CRITICAL SECTION
        some_critical_code_here
        # STOP CRITICAL SECTION

    tx.execute("select * from websites where link = %s", (item['link'][0], ))
    ...
Другие вопросы по тегам