Python threading с очередью: как избежать использования join?

У меня есть сценарий с 2 ​​потоками:

  1. поток, ожидающий сообщения из сокета (встроенный в библиотеку C - блокирующий вызов "Barra.ricevi"), затем помещающий элемент в очередь

  2. поток, ожидающий, чтобы получить элемент из очереди и сделать что-то

Образец кода

import Barra
import Queue    
import threading

posQu = Queue.Queue(maxsize=0)

def threadCAN():
    while True:
        canMsg = Barra.ricevi("can0")
        if canMsg[0] == 'ERR':
            print (canMsg)
        else:
            print ("Enqueued message"), canMsg
            posQu.put(canMsg)

thCan = threading.Thread(target = threadCAN)
thCan.daemon = True
thCan.start()

while True:
    posMsg = posQu.get()
    print ("Messagge from the queue"), posMsg

В результате каждый раз, когда из сокета поступает новое сообщение, в очередь добавляется новый элемент, НО основной поток, который должен получать элементы из очереди, никогда не просыпается.

Вывод следующий:

Постановка сообщения

Постановка сообщения

Постановка сообщения

Постановка сообщения

Я ожидал иметь:

Постановка сообщения

Сообщение из очереди

Постановка сообщения

Сообщение из очереди

Единственный способ решить эту проблему - добавить строку:

posQu.join()

в конце потока ожидаем сообщения из сокета и строки:

posQu.task_done()

в конце основного потока.

В этом случае после того, как новое сообщение было получено из сокета, поток блокирует ожидание, пока основной поток обработает поставленный в очередь элемент.

К сожалению, это нежелательное поведение, так как я хотел бы, чтобы поток всегда был готов получать сообщения из сокета и не ждал завершения задания из другого потока.

Что я делаю не так? Спасибо

Андрей (Италия)

1 ответ

Решение

Это вероятно потому, что ваш Barra не снимает глобальную блокировку интерпретатора (GIL), когда Barra.ricevi, Вы можете проверить это, хотя.

GIL гарантирует, что одновременно может работать только один поток (ограничивая полезность потоков в многопроцессорной системе). GIL переключает потоки каждые 100 "тиков" - тик свободно отображается в инструкциях байт-кода. Смотрите здесь для более подробной информации.

В вашем потоке продюсера мало что происходит вне вызова C-библиотеки. Это означает, что поток продюсера будет вызывать Barra.ricevi очень много раз, прежде чем GIL переключается на другой поток.

Решения этой проблемы, с точки зрения возрастающей сложности:

  • Вызов time.sleep(0) после добавления элемента в очередь. Это дает поток, так что другой поток может работать.
  • использование sys.setcheckinterval() снизить количество "тиков", выполняемых перед переключением потоков. Это будет стоить того, чтобы сделать программу намного более вычислительной.
  • использование multiprocessing скорее, чем threading, Это включает в себя использование multiprocessing.Queue вместо Queue.Queue,
  • изменять Barra так что он освобождает GIL при вызове его функций.

Пример использования multiprocessing, Имейте в виду, что при использовании многопроцессорной обработки ваши процессы больше не имеют подразумеваемого общего состояния. Вам нужно будет посмотреть на многопроцессорность, чтобы понять, как передавать информацию между процессами.

import Barra  
import multiprocessing

def threadCAN(posQu):
    while True:
        canMsg = Barra.ricevi("can0")
        if canMsg[0] == 'ERR':
            print(canMsg)
        else:
            print("Enqueued message", canMsg)
            posQu.put(canMsg)

if __name__ == "__main__":
    posQu = multiprocessing.Queue(maxsize=0)
    procCan = multiprocessing.Process(target=threadCAN, args=(posQu,))
    procCan.daemon = True
    procCan.start()

    while True:
        posMsg = posQu.get()
        print("Messagge from the queue", posMsg)
Другие вопросы по тегам