Проблемы с Python Threading и кодом в целом

Учитывая следующий код Python3, с потоками:

class main:
    def __init__(self):
        self.text = open(os.getcwd()+"/FileScanLogs.txt", "a+")
        self.hashlist = queue.Queue()
        self.filelist = queue.Queue()
        self.top = '/home/'
        for y in range(12):
            self.u = threading.Thread(target=self.md5hash)
            self.u.daemon = True
            self.u.start()
        for x in range(4):
            self.t = threading.Thread(target=self.threader)
            self.t.daemon = True
            self.t.start()
        main.body(self)

    def body(self):
        start = time.time()
        self.text.write("Time now is " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n")
        for root, dirs, files in os.walk(self.top):
            for f in files:
                path = os.path.join(root,f)
                self.filelist.put(path)
        self.t.join()
        self.u.join()
        self.text.write("Total time taken     : " + str(time.time() - start) + "\n")
        print("Log file is created as " + os.getcwd() + "/FileScanLogs.txt")

    def md5hash(self):
        while True:
            entry = self.filelist.get()
            //hashing//
            lists = finalhash + ',' + entry
            self.hashlist.put(lists)
            self.filelist.task_done()

    def compare(self, hashed, path):
        f = open(os.getcwd() + "/database.csv", 'r')
        for row in f:
            if row.split(':')[1] == hashed:
                print("Suspicious File!")
                print("Suspecfs: " + row.split(':')[2] + "File name : " + path)

    def threader(self):
        while True:
            item = self.hashlist.get()
            hashes = item.split(',')[0]
            path = item.split(',')[1]
            self.compare(hashes, path)
            self.hashlist.task_done()

main()

Проблема 1: В def body(self)есть линия self.text.write("Time now is ..."), Эта строка не отображается в созданном файле журнала.

Проблема 2: В def compare(self, hashed, path), существует строка, которая печатает "Подозрительный файл!" и file path каждый раз, когда происходит столкновение хешей. Эта строка всегда печатается не по порядку, так как 4 потока t борются за то, кто напечатает первым. Для этого мне нужно знать, как запускать потоки Python. print Команды последовательно, а не как им нравится - как?

Проблема 3: В def body(self)существуют линии self.u.join() а также self.t.join(), Команда join()Насколько мне известно, это команда, ожидающая завершения потока перед продолжением. Потоки не заканчиваются.

Дополнительная информация 1: Я пишу многопоточность, так как позже мне нужно преобразовать код в многопоточность.

Дополнительная информация 2. Пожалуйста, дайте мне знать, если я неправильно понял какие-либо команды / синтаксис в моем коде, когда вы просматриваете.

1 ответ

Решение

Проблема 1: Вы записываете в свой файловый буфер - он сбрасывается в текущий файл, только когда буфер заполнен, дескриптор файла закрыт или вы явно вызываете flush() на это (то есть self.text.flush())

Проблема 2: Вы либо хотите, чтобы ваш код выполнялся параллельно (и это не так, но мы вернемся к этому), но вы теряете порядок выполнения или выполняете последовательно, сохраняя порядок. Если вы хотите запускать несколько потоков, не имеет смысла запускать их один за другим, потому что тогда вы не выполняете свой код параллельно и можете выполнять все в основном потоке.

Если вы хотите управлять только выводом в STDOUT, при условии, что он не мешает выполнению потока, вы можете захватить то, что вы хотите напечатать, и распечатать его в конце под мьютексом (так что только один поток пишет за раз) или даже передать его обратно в главный поток и получить доступ к STDOUT. Простой пример мьютекса:

PRINT_MUTEX = threading.Lock()

def compare(self, hashed, path):  # never mind the inefficiency, we'll get to that later
    out = []  # hold our output buffer
    with open(os.getcwd() + "/database.csv", 'r') as f:
        for row in f:
            row = row.split(':')
            if row[1] == hashed:
                out.append("Suspicious File!")
                out.append("Suspecfs: " + row[2] + "File name : " + path)
    if out:
        with self.PRINT_MUTEX:  # use a lock to print out the results
            print("\n".join(out))

Это не будет следить за выполнением потока (и при этом вы не должны пытаться хотя бы побороть цель "параллельного" выполнения), но по крайней мере потоки будут выводить свои compare результаты по одному вместо того, чтобы перемежать их результаты. Если вы хотите, чтобы ваш основной поток / процесс управлял STDOUT, особенно если вы хотите преобразовать его в код мультиобработки, проверьте этот ответ.

Проблема 3: Ваши потоки никогда не выходят, потому что они застряли в while True цикл - пока вы не оторвитесь от него, потоки будут продолжать работать. Я не знаю, в чем причина того, как вы структурировали код, но если вы пытаетесь распараллелить распечатку файлов (основной поток), чтение, хеширование (потоки md5hash) и сравнение (потоки потоков), вы, вероятно, захотите прекратить хэширование, когда файлов больше нет, и прекратить сравнение, если хэшей больше нет. Для этого вы не можете использовать Queue.task_done() как это там, чтобы сигнализировать другим "слушателям" (если они заблокированы Queue.join() звоните, что у вас нет), что вы сделали с изменениями очереди.

Вы должны использовать threading.Event сигнал для этого, но если вы хотите сохранить его queue.Queue только вы можете создать специальное свойство, обозначающее конец вашей очереди, а затем поместить его в очередь, когда больше нечего обрабатывать, а затем заставить свои потоки выходить из своих циклов, когда они сталкиваются с этим специальным свойством. Давайте сначала исправим большой упущение в вашем коде - вы вообще не храните ссылку на ваши потоки, вы перезаписываете ее последним потоком, поэтому вы не можете реально контролировать поток выполнения - вместо того, чтобы сохранять последнюю ссылку на поток в переменная, хранить все ссылки в списке. Кроме того, если вы собираетесь ждать, пока все закроется, не используйте потоки демона:

def __init__(self):
    self.text = open(os.getcwd()+"/FileScanLogs.txt", "a+")  # consider os.path.join()
    self.hashlist = queue.Queue()
    self.filelist = queue.Queue()
    self.hashers = []  # hold the md5hash thread references
    self.comparators = []  # hold the threader thread references
    self.top = '/home/'
    for _ in range(12):  # you might want to consider a ThreadPool instead
        t = threading.Thread(target=self.md5hash)
        t.start()
        self.hashers.append(t)
    for _ in range(4):
        t = threading.Thread(target=self.threader)
        t.start()
        self.comparators.append(t)
    main.body(self)

Теперь мы можем изменить main.body() метод, так что он добавляет вышеупомянутые специальные значения в конец наших очередей, чтобы рабочие потоки знали, когда остановиться:

QUEUE_CLOSE = object()  # a 'special' object to denote end-of-data in our queues

def body(self):
    start = time.time()
    self.text.write("Time:  " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n")
    for root, dirs, files in os.walk(self.top):
        for f in files:
            path = os.path.join(root, f)
            self.filelist.put(path)
    self.filelist.put(self.QUEUE_CLOSE)  # no more files, signal the end of the filelist
    for t in self.hashers:  # let's first wait for our hashing threads to exit
        t.join()
    # since we're not going to be receiving any new hashes, we can...
    self.hashlist.put(self.QUEUE_CLOSE)  # ... signal the end of the hashlist as well
    for t in self.comparators:  # let's wait for our comparator threads to exit
        t.join()
    self.text.write("Total: " + str(time.time() - start) + "\n")
    self.text.close()  # close the log file (this will also flush the previous content)
    print("Log file is created as " + os.getcwd() + "/FileScanLogs.txt")

И, следовательно, нам нужно изменить рабочие потоки, чтобы они выходили, когда они сталкиваются с концом очереди:

def md5hash(self):
    while self.filelist:
        entry = self.filelist.get()
        if entry is self.QUEUE_CLOSE:  # end of queue encountered
            self.filelist.put(self.QUEUE_CLOSE)  # put it back for the other threads
            break  # break away the processing
        finalhash = whatever_is_your_hash_code(entry)
        lists = finalhash + ',' + entry
        self.hashlist.put(lists)

def threader(self):
    while True:
        item = self.hashlist.get()
        if item is self.QUEUE_CLOSE:  # end of queue encountered
            self.hashlist.put(self.QUEUE_CLOSE)  # put it back for the other threads
            break  # break away the queue
        hashes = item.split(',')[0]
        path = item.split(',')[1]
        self.compare(hashes, path)

Теперь, если вы запустите его, при условии, что ваша часть хэширования не работает, все должно в конечном итоге завершиться.

Помимо неуклюжих настроек, одно, что вам определенно следует сделать, это оптимизировать main.compare() метод - поскольку файл CSV не изменяется во время выполнения (и если это так, вы должны обрабатывать его в памяти), то загрузка всего CSV и циклический просмотр его для хеша каждого файла, который вы хотите сравнить, смешна. Загрузить весь CSV как hash<=>whateverdict а затем сделать сравнения на месте (т.е. if hashed in your_map) вместо

И, наконец, как я упоминал выше, время, хм, дождь на вашем параде - все это было даром! Из-за страшного GIL ни один из ваших потоков здесь не выполняется параллельно (на самом деле, только загрузка файла делает до некоторой степени, но любое преимущество, вероятно, сводится на нет тем временем, которое требуется для хеширования данных). Они работают как отдельные, честные системные потоки, но GIL гарантирует, что одновременно работает только один из этих потоков, поэтому этот код с точки зрения обработки, скорее всего, медленнее, чем если бы вы выполняли все в одном потоке. Это не очень поможет вам в процессе многопроцессорности, потому что вы не можете совместно использовать состояние локального экземпляра (хорошо, вы можете проверить этот ответ, но это всего лишь основная PITA и большую часть времени не стоит проходить через проблему).

Другие вопросы по тегам