Dispycos неблокирующие нити
Я расширяю потоки. Постараюсь достичь паралелизма следующим образом:
class WorkerGenerico(threading. Thread):
""" Clase con la funcionalidad principal de un worker genérico: Se inicia en un thread y se para de forma graceful
mediante join (bloqueante). Adicionalmente permite enviar la orden de detención de forma no bloqueante mediante
"gracefull_stop", aunque no garantiza que se detenga el thread.
Args:
input_q (:class:`queue.Queue`): Cola de entrada de datos al proceso. No deberia usarse, esta por escaoalabilidad.
input_q (:class:`queue.Queue`): Cola por la que el programa envía sus resultados.
Attributes:
input_q (:class:`queue.Queue`): Cola de salida de entrada de datos (comandos?)
input_q (:class:`queue.Queue`): Cola de salida de datos (resultados)
stoprequest (:class:`threading.Event`): Condición de parada
Raises:
"""
def __init__(self, input_q, output_q):
super(WorkerGenerico, self).__init__()
self.input_q = input_q
self.output_q = output_q
self.stoprequest = threading.Event()
# def run(self):
# pass
def join(self, timeout=None):
self.stoprequest.set()
super(WorkerGenerico, self).join(timeout)
def gracefull_stop(self):
self.stoprequest.set()
self.transcriptor.join()
def has_orden_parada(self):
return self.stoprequest.is_set()
def _enviar_msg(self, texto):
return self.output_q.put((self.name, texto))
def _recibir_msg(self, timeout=0.5):
return self.input_q.get(True, timeout)
def _imprimir_error(self, texto):
eprint(texto)
def _tratar_excepcion(self, excepcion):
self._imprimir_error("Excepcion no controlada: {0}".format(excepcion))
eprint("Error inesperado:", sys.exc_info()[0])
Затем.run становится неблокирующим циклом при выполнении task.start() извне. _enviar_msg и _recibir_msg используют очереди для связи с основным процессом и отправки промежуточных результатов или получения новых параметров в середине цикла. Мое основное программное обеспечение не изменено, и никому не нужно ждать, пока оно запустится.
Теперь мне нужно увеличить масштаб, а одного компьютера недостаточно, поэтому я решил использовать dispycos, однако они, похоже, имеют блокирующие задачи и позволяют мне отправлять только одну задачу и один раз, а затем она ожидает завершения всех задач.,
Если я запускаю dispycos client example 8, оказывается, что он никогда не выходит за рамки
pycos.Task(client_proc, computation, 10 if len(sys.argv) < 2 else int(sys.argv[1]))
Чтобы дать вам некоторое представление, мои задачи выполняются в течение 23-72 часов. Их требования к процессору являются динамическими, и я, вероятно, буду вручную контролировать, сколько будет входить в каждый узел.
Я хотел бы иметь возможность распределять исходный код на каждом компьютере в первый раз, чтобы избежать ненужных полетов по определениям классов по сети.
Если возможно, я бы хотел прочитать все входящие сообщения из каждого типа задач в одну и ту же очередь. Прямо сейчас я передаю один и тот же output_q каждому процессу (и их собственный input_q для получения направленных заказов), чтобы добиться этого, и он работает как шарм.