Модель Keras не может предсказать, если вызывается в потоке

Я пытаюсь выполнять прогнозы, используя керасы и имеющуюся модель VGG16 в поточном приложении. Однако, если я вызываю прогноз в главном потоке, все работает отлично. Но если я предсказываю внутри многопоточной функции (использую ли я threading, multiprocessing, ...), он просто глохнет во время прогноза:

Вот минимальный пример:

########################################
# Alter this variable
USE_THREADING = True
########################################

import numpy as np
import cv2
import copy
import threading
import keras
import platform
import tensorflow as tf
from keras.models import model_from_json
from multiprocessing import Process

def inference_handler(model_hash, frame_resized):
    print("multiprocessing: before prediction call")
    model_hash.predict(np.expand_dims(frame_resized, axis=0), batch_size = 1)
    print("multiprocessing: after prediction call")

if __name__ == "__main__":
    print("keras version:", keras.__version__)
    print("tf vresion: ", tf.__version__)
    print("python version:", platform.python_version())
    model_hash = keras.applications.VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    # Perform the demo
    cap = cv2.VideoCapture(0)
    while(True):
        # Capture frame-by-frame
        ret, frame = cap.read()
        # Process the keys
        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):
            print("quit")
            break
        # Get the proper image for the network
        frame_resized = cv2.resize(frame, (224, 224))
        # show the images
        cv2.imshow('frame',frame)
        cv2.imshow('frame_resized',frame_resized)

        # Predict
        if USE_THREADING:
            p = Process(target=inference_handler, args=(model_hash, frame_resized,))
            p.start()
            p.join()
        else:
            print("main thread: before prediction call")
            model_hash.predict(np.expand_dims(frame_resized, axis=0), batch_size = 1)
            print("main thread: after prediction call")


    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()

USE_THREADING = False дает мне:

Using TensorFlow backend.
keras version: 2.2.0
tf vresion:  1.8.0
python version: 3.5.2
2019-02-25 20:47:32.926696: I tensorflow/core/platform/cpu_feature_guard.cc:140] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
main thread: before prediction call
main thread: after prediction call
main thread: before prediction call
main thread: after prediction call
main thread: before prediction call
...

USE_THREADING = True (что не получается) дает мне:

Using TensorFlow backend.
keras version: 2.2.0
tf vresion:  1.8.0
python version: 3.5.2
2019-02-25 20:50:34.922696: I tensorflow/core/platform/cpu_feature_guard.cc:140] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
multiprocessing: before prediction call

1 ответ

Таким образом, у Keras с тензорным бэкэндом, к сожалению, возникает проблема остановки во время прогнозирования, если модель была задана в качестве аргумента для подпроцесса. Однако, если модель создается непосредственно в подпроцессе, все работает нормально. Поэтому решение состоит в том, чтобы отправить кадры подпроцессу через очереди. Вот рабочее решение:

import numpy as np
import cv2
import copy
import keras
import platform
import tensorflow as tf
from keras.models import model_from_json
from multiprocessing import Process, Queue

def inference_handler(frame_queue):
    model_hash = keras.applications.VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    model_hash._make_predict_function()
    while True:
        print("multiprocessing: before queue")
        frame_resized = frame_queue.get(block=True, timeout=None)
        print("multiprocessing: before prediction call")
        model_hash.predict(np.expand_dims(frame_resized, axis=0), batch_size = 1)
        print("multiprocessing: after prediction call")

if __name__ == "__main__":
    print("keras version:", keras.__version__)
    print("tf version: ", tf.__version__)
    print("python version:", platform.python_version())
    frame_queue = Queue(maxsize=1)
    p = Process(target=inference_handler, args=(frame_queue,))
    p.start()
    # p.join()
    cap = cv2.VideoCapture(0)
    while(True):
        # Capture frame-by-frame
        ret, frame = cap.read()
        # Process the keys
        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):
            print("quit")
            break
        # Get the proper image for the network
        frame_resized = cv2.resize(frame, (224, 224))
        # show the images
        cv2.imshow('frame',frame)
        cv2.imshow('frame_resized',frame_resized)

        # Advertise the frame
        if frame_queue.empty():
            print("Put frame into the queue")
            frame_queue.put_nowait(frame_resized)

    # When everything done, release the capture
    p.terminate()
    cap.release()
    cv2.destroyAllWindows()

что дает мне

keras version: 2.2.0
tf version:  1.8.0
python version: 3.5.2
Put frame into the queue
multiprocessing: before queue
multiprocessing: before prediction call
Put frame into the queue
multiprocessing: after prediction call
multiprocessing: before queue
multiprocessing: before prediction call
Put frame into the queue
...
Другие вопросы по тегам