Python - multiprocessing - queue: ссылка на мою очередь изменилась, даже если это один и тот же объект?

Я начал использовать многопроцессорность не так давно, и он работает на базовых примерах. После этого я попытался реализовать какую-то программу ввода с несколькими звуковыми сигналами и попытался направить поток ввода через очередь в какой-либо модуль обработки, и в настоящее время это дает сбой. Я опишу мою проблему в 3 пунктах: структура папок, структура процесса, что я пытался.

Структура папок

  • Корневая папка
    • заявка
      • start_applicaton.py
      • input_cfg.ini
    • ядро
      • core.py
      • gui.py
      • audio_recorder.py (Использование sounddevice.InputStream)
      • x_recorder.py

Структура процесса Когда я запускаю приложение, вызывается графический интерфейс, и после нажатия кнопки "Пуск" создаются процессы.

  • Основной процесс
  • audio_recorder_1 Процесс
  • audio_recorder_ Process
  • прикладной процесс

Что я пробовал

core.py

from multiprocessing import Queue, Process
central_queue = Queue()
...
d = {}
d['output'] = central_queue
o = AudioRecorder('name', **d)

start_application.py

import core
def handle_queue_data():
    while True:
        print(str(core.central_queue.get()))
if __name__ == "__main__":
    Process(target=handle_queue_data, name="syncOutput").start()

audio_recorder.py

class AudioRecorder(object):
    def __init__(self, name, **d):
        ...
        self.output_queue = d['output']
    def run(self):
        queue = Queue()
        def callback(indata, frames, time, status):
            if status:
                print(status, flush=True)
            # Push the got data into the queue
            queue.put([indata.copy()])
        with sd.InputStream(samplerate=self.sample_rate, device=self.device_id, channels=self.channel_id, callback=callback):
            while True:
                self.output_queue.put(queue.get())

Это не сработало. После отладки похоже после старта с core.py записи, ссылка на очередь изменилась... FYI отладочной информации:

# in the audio_recorder.py object
centralized_queue = {Queue} <multiprocessing.queues.Queue object at 0x00000000086B3320>
 _buffer = {deque} deque([[array([[-0.01989746, -0.02053833],\n       [-0.01828003, -0.0196228 ],\n       [-0.00634766, -0.00686646],\n       ..., \n       [-0.01119995, -0.01144409],\n       [-0.00900269, -0.00982666],\n       [-0.00823975, -0.00888062]], dtype=float32)]])
 _close = {Finalize} <Finalize object, callback=_finalize_close, args=[deque([[array([[-0.01989746, -0.02053833],\n       [-0.01828003, -0.0196228 ],\n       [-0.00634766, -0.00686646],\n       ..., \n       [-0.01119995, -0.01144409],\n       [-0.00900269, -0.00982666],\n       [-0
 _closed = {bool} False
 _ignore_epipe = {bool} False
 _joincancelled = {bool} False
 _jointhread = {Finalize} <Finalize object, callback=_finalize_join, args=[<weakref at 0x00000000083A2638; to 'Thread' at 0x0000000004DF1B00>], exitprority=-5>
 _maxsize = {int} 2147483647
 _notempty = {Condition} <Condition(<unlocked _thread.lock object at 0x0000000004738198>, 0)>
 _opid = {int} 1320
 _reader = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x00000000086B34A8>
 _rlock = {Lock} <Lock(owner=None)>
 _sem = {BoundedSemaphore} <BoundedSemaphore(value=2147483645, maxvalue=2147483647)>
 _thread = {Thread} <Thread(QueueFeederThread, started daemon 9344)>
 _wlock = {NoneType} None
 _writer = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x00000000086B3518>

# in the handle_queue_data
centralized_queue = {Queue} <multiprocessing.queues.Queue object at 0x000000000479DA20>
 _buffer = {deque} deque([])
 _close = {NoneType} None
 _closed = {bool} False
 _ignore_epipe = {bool} False
 _joincancelled = {bool} False
 _jointhread = {NoneType} None
 _maxsize = {int} 2147483647
 _notempty = {Condition} <Condition(<unlocked _thread.lock object at 0x00000000058C8350>, 0)>
 _opid = {int} 7208
 _reader = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x000000000684C438>
 _rlock = {Lock} <Lock(owner=None)>
 _sem = {BoundedSemaphore} <BoundedSemaphore(value=2147483647, maxvalue=2147483647)>
 _thread = {NoneType} None
 _wlock = {NoneType} None
 _writer = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x00000000058DE6A0>

Я также пытался использовать разные вещи после того, как все, безуспешно, мне не удалось передать данные... Возможно ли, что очередь является изменяемым объектом здесь? Или есть ошибка в многопроцессорной обработке (очень маловероятно) или, возможно, комбинация со звуковым устройством делает нестабильную очередь?

Извините, мое описание довольно длинное...

Заранее благодарю за помощь!
С наилучшими пожеланиями,
Себастьян

1 ответ

Решение

У меня нет опыта работы с multiprocessing, но я понимаю, что все объекты в пространстве имен модуля start_application.py дублируются для каждого процесса. Если я не ошибаюсь, это включает в себя core модуль. Следовательно, core.central_queue имеет отдельный экземпляр для каждого процесса. По крайней мере, в Windows это выглядит так, и документы Python рекомендуют в любом случае "явно передавать ресурсы дочерним процессам".

Вы должны использовать if __name__ == '__main__': блок для создания уникального экземпляра Queue и уникальный экземпляр AudioRecorder, тоже. Затем вы можете передать эти уникальные экземпляры своим процессам с помощью args аргумент Process (как показано в приведенной выше ссылке).

Кроме того, я не знаю, чего ты пытаешься достичь. Вы хотите обрабатывать случайные фрагменты аудиовхода случайным образом одним из доступных процессов? Или вы хотите предоставить один и тот же аудиовход целиком для каждого из процессов?

В последнем случае у вас должна быть отдельная очередь для каждого дочернего процесса! sd.InputStream все еще должен быть уникальным. И в твоем with оператор, вы должны перебрать все дочерние процессы и поместить текущий фрагмент аудио в каждую из очередей процесса отдельно.

PS: я только что понял, что вы можете начать еще один дополнительный процесс по какой-то причине. В этом случае вы должны рассмотреть возможность отказа от всего multiprocessing беспорядок и просто делать все, что вам нужно сделать в with заявление.

ОБНОВИТЬ:

Если вы хотите использовать несколько аудиоустройств (и, следовательно, несколько потоков PortAudio) одновременно, вам все равно не обязательно multiprocessing, Вы можете иметь with оператор с несколькими менеджерами контекста и сделать вашу обработку там.

В зависимости от того, чего вы хотите достичь, у вас может быть одна очередь, в которую записываются все звуковые обратные вызовы, или одна очередь на обратный вызов.

Если у вас есть веская причина для использования multiprocessingЭто также должно работать нормально, если вы запускаете все аудиопотоки в основном процессе и выполняете обработку в новом дочернем процессе.

Другие вопросы по тегам