Многопроцессорная очередь хранения списков списков

Я пытаюсь поставить в очередь список списков в процессе (в функции с именем proc ниже), а затем завершить процесс после того, как я позвоню event.set(), Моя функция proc всегда заканчивается, судя по распечатке, но сам процесс еще продолжается. Я могу заставить это работать, если я сделаю количество списков в очереди в вызове put ниже (batchperq переменная) (или размер каждого вложенного списка меньше).

import multiprocessing as mp
import queue
import numpy as np
import time

def main():
    trainbatch_q = mp.Queue(10)

    batchperq = 50  
    event = mp.Event()

    tl1 = mp.Process(target=proc,
                            args=( trainbatch_q, 20, batchperq, event))
    tl1.start()
    time.sleep(3)
    event.set()
    tl1.join()
    print("Never printed..")    

def proc(batch_q, batch_size, batchperentry, the_event):
    nrow = 100000 
    i0 = 0
    to_q = []
    while i0 < nrow:
        rowend = min(i0 + batch_size,nrow)
        somerows = np.random.randint(0,5,(rowend-i0,2))
        to_q.append(somerows.tolist())  
        if len(to_q) == batchperentry:
            print("adding..", i0, len(to_q))
            while not the_event.is_set():
                try: 
                    batch_q.put(to_q, block=False)
                    to_q = []
                    break
                except queue.Full:
                    time.sleep(1)
        i0 += batch_size                    
    print("proc finishes")

Когда я делаю прерывание клавиатуры, я получаю трассировку ниже... какой может быть "блокировка", которую он пытается получить? Что-то делать с очередью?

Traceback (most recent call last):
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/process.py", line 252, in _bootstrap
    util._exit_function()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/util.py", line 322, in _exit_function
    _run_finalizers()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/util.py", line 262, in _run_finalizers
    finalizer()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/queues.py", line 198, in _finalize_join
    thread.join()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

1 ответ

Причина, по которой ваш процесс никогда не завершается, заключается в том, что вы никогда не говорите ему выйти. Я добавил return до конца вашей функции, и ваш процесс, кажется, завершается правильно сейчас.

def proc(batch_q, batch_size, batchperentry, the_event):
    nrow = 100000
    i0 = 0
    to_q = []
    while i0 < nrow:
        rowend = min(i0 + batch_size,nrow)
        somerows = np.random.randint(0,5,(rowend-i0,2))
        to_q.append(somerows.tolist())  
        if len(to_q) == batchperentry:
            print("adding..", i0, len(to_q))
            while not the_event.is_set():
                try: 
                    batch_q.put(to_q, block=False)
                    to_q = []
                    break
                except queue.Full:
                    time.sleep(1)
        i0 += batch_size                    
    print("proc finishes")
    return            # Added this line, You can have it return whatever is most relevant to you.

Вот полная программа, которую я запустил, включая мои изменения, чтобы она успешно завершилась.

import multiprocessing as mp
import queue
import numpy as np
import random
import time

def main():
    trainbatch_q = mp.Queue(10)
    batchperq = 50  
    event = mp.Event()

    tl1 = mp.Process(target=proc,
                            args=( trainbatch_q, 20, batchperq, event))
    print("Starting")
    tl1.start()
    time.sleep(3)
    event.set()
    tl1.join()
    print("Never printed..")    

def proc(batch_q, batch_size, batchperentry, the_event):
    nrow = 100000
    i0 = 0
    to_q = []
    while i0 < nrow:
        rowend = min(i0 + batch_size,nrow)
        somerows = np.random.randint(0,5,(rowend-i0,2))
        to_q.append(somerows.tolist())  
        if len(to_q) == batchperentry:
            print("adding..", i0, len(to_q))
            while not the_event.is_set():
                try: 
                    batch_q.put(to_q, block=False)
                    to_q = []
                    break
                except queue.Full:
                    time.sleep(1)
        i0 += batch_size                    
    print("proc finishes")
    return      # Added this line, You can have it return whatever is most relevant to you.

if __name__ == "__main__":
  main()

Надеюсь, это поможет.

Другие вопросы по тегам