Multi Threading застрял - есть подозрение на ошибку в условной переменной

Я тестирую программное обеспечение Ant Colony Optimization (ACO), которое работает с несколькими потоками (по 1 для каждого созданного муравья).

Каждая итерация ACO должна ждать завершения всех потоков, прежде чем разрешить запуск следующей итерации. Я делаю это с помощью "условия ()" из модуля потоков.

Поскольку муравьи совместно используют матрицу ферормона, чтение и запись в этой матрице могут быть заблокированы также из модуля потоков.

Теперь описание проблемы:

Я запускаю функцию и что-то печатаю на каждой итерации. Иногда, не всегда, кажется, что выполнение функции прекращается, то есть прекращается печать, то есть итерация никогда не заканчивается.

Я, честно говоря, не знаю, почему это происходит, и я был бы признателен за любой ответ, который мог бы привести меня в нужное русло. Если бы мне пришлось угадывать, я бы сказал, что переменная условия не называется должным образом, или что-то в этом роде. Однако я не уверен, и мне также кажется странным, что это случается иногда.

Ниже приведены соответствующие функции. ACO начинается с вызова функции start(). Это создает N потоков, которые после завершения вызывают update(). Эта функция обновления, после вызова N раз, вызывает notify, что позволяет start() продолжить процесс и, наконец, запустить следующую итерацию. Я также опубликовал метод запуска каждой темы.

Стоит упомянуть, что без действий демона ошибка практически не возникает. С действиями демона это происходит почти всегда (что я также нахожу странным). Наконец, ошибка не всегда возникает в одной и той же итерации.

    def start(self):
        self.ants = self.create_ants()
        self.iter_counter = 0

        while self.iter_counter < self.num_iterations:
            print "START ACQUIRED"
            self.cv.acquire()
            print "calling iteration"
            self.iteration()
            #CV wait until all ants (threads) finish and call update, which
            #calls notify(), and allow continuation
            while not self.iter_done:
                print "iter not complete, W8ING"
                self.cv.wait()
            print "global update "
            self.global_update_with_lock()
            print "START RELEASED"
            self.cv.release()

def update(self, ant):
    lock = Lock()
    lock.acquire()

    print "Update called by %s" % (ant.ID,)
    self.ant_counter += 1

    self.avg_path_cost += ant.path_cost

    # book-keeping
    if ant.path_cost < self.best_path_cost:
        self.best_path_cost = ant.path_cost
        self.best_path_mat = ant.path_mat
        self.best_path_vec = ant.path_vec
        self.last_best_path_iteration = self.iter_counter

    #all threads finished, call notify
    print "ant counter"
    print self.ant_counter
    if self.ant_counter == len(self.ants):
        print "ants finished"
        #THIS MIGHT CAUSE PROBLEMS (no need to notify if its no one waiting)
        self.best_cost_at_iter.append(self.best_path_cost)
        self.avg_path_cost /= len(self.ants)

        self.cv.acquire()
        self.iter_done = True
        self.cv.notify()
        self.cv.release()

    lock.release()

    # overide Thread's run()
    def run(self):
        graph = self.colony.graph
        while not self.end():
            # we need exclusive access to the graph
            graph.lock.acquire()
            new_node = self.state_transition_rule(self.curr_node)
            self.path_cost += graph.delta(self.curr_node, new_node)

            self.path_vec.append(new_node)
            self.path_mat[self.curr_node][new_node] = 1  #adjacency matrix representing path

            #print "Ant %s : %s, %s" % (self.ID, self.path_vec, self.path_cost,)

            self.local_updating_rule(self.curr_node, new_node)
            graph.lock.release()

            self.curr_node = new_node

        # close the tour
        self.path_vec.append(self.path_vec[0])

        #RUN LOCAL HEURISTIC
        if self.daemon == True:
            try:
                daemon_result =  twoOpt(self.path_vec, graph.delta_mat)
                d_path, d_adj = daemon_result['path_vec'], daemon_result['path_matrix']
                self.path_vec = d_path
                self.path_mat = d_adj
            except Exception, e:
                print "exception: " + str(e)
                traceback.print_exc()

        self.path_cost += graph.delta(self.path_vec[-2], self.path_vec[-1])
        # send our results to the colony
        self.colony.update(self)
        #print "Ant thread %s terminating." % (self.ID,)
        
        # allows thread to be restarted (calls Thread.__init__)
        self.__init__(self.ID, self.start_node, self.colony, self.daemon, self.Beta, self.Q0, self.Rho)

Решение проблемы: во- первых, я исправил ошибку в ожидании условных переменных в соответствии с комментариями здесь. Во-вторых, он иногда зависал, и это было связано с ошибкой в ​​обновлении счетчика потоков. Решением было изменить счетчик с int на массив с длиной num_threads, полной 0, где каждый поток обновляет свою позицию в списке. Когда все потоки заканчиваются, массив счетчиков должен быть все 1. В настоящее время это работает просто отлично.

0 ответов

Другие вопросы по тегам