Многопроцессорные процессоры повторного использования
Я намерен использовать многопроцессорность, чтобы ускорить конвейер изображений: поэтому я сравниваю процесс последовательно с этапом многопроцессорности. На самом деле процесс последовательности происходит быстрее, чем этап многопроцессорной обработки. Поправьте меня, если я ошибаюсь, но я думаю, что каждый раз создавать новый процессор стоит дорого, поэтому я хочу использовать процессор повторно. С другой стороны, я мог столкнуться с общими данными. Но я не эксперт. Спасибо всем заранее!
Мой код:
import multiprocessing
import numpy as np
import asyncio
import glob
import cv2
import time
q_img = multiprocessing.Queue()
def readImg(images,numbers,q_img,q_feature_1,q_feature_2):
start = time.time()
cnt = 0
for n in numbers:
#print("Start process",n)
n = cv2.imread(images[n],0)
q_img.put(n)
if cnt == 20:
prozess2 = multiprocessing.Process(target = calc_feature1, args =(q_img,q_feature_1))
prozess2.start()
cnt = 0
cnt += 1
time.sleep(0.001)
end = time.time()
print("PROCESS 1 finnished",end - start)
def calc_feature1(q_img,q_feature):
while q_img.empty() is False:
img = q_img.get()
#print("ThreshCalculation")
#Start Barcode detection
sq_n = cv2.threshold(img,127,255,cv2.THRESH_BINARY)
sq_n = cv2.threshold(img,100,255,cv2.THRESH_BINARY)
sq_n = cv2.threshold(img,50,255,cv2.THRESH_BINARY)
q_feature.put(sq_n)
if __name__ == "__main__":
numbers = np.arange(0,200)
images = glob.glob("InputSimulation\*"+".jpg")
start = time.time()
Thresh = []
for img_path in images:
img = cv2.imread(img_path,0)
Thresh.append(cv2.threshold(img,127,255,cv2.THRESH_BINARY))
Thresh.append(cv2.threshold(img,100,255,cv2.THRESH_BINARY))
Thresh.append(cv2.threshold(img,50,255,cv2.THRESH_BINARY))
time.sleep(0.001)
end = time.time()
print("Sequencial",end - start)
q_img = multiprocessing.Queue()
q_feature_1 = multiprocessing.Queue()
q_feature_2 = multiprocessing.Queue()
prozess1 = multiprocessing.Process(target = readImg, args =(images,numbers,q_img,q_feature_1,q_feature_2))
prozess1.start()
if (q_img.empty()):
prozess1.join()
idx = 0
while q_feature_1.empty() is False:
root = (q_feature_1.get())
print("Results",idx)
idx +=1
РЕЗУЛЬТАТ:
Последовательный 6.204113006591797 ПРОЦЕСС 1 закончен 7.102877140045166
Вторая попытка: на этот раз я хочу разделить мои данные на 50 блоков изображений и запустить процессор, если 50 изображений были прочитаны
import os, sys, errno
import re
import argparse
from time import time
import multiprocessing
import glob
import numpy as np
import matplotlib.pyplot as plt
import cv2
def computeFeatures(input_chunk, chunk_num):
thresholded_chunk = []
print("Processing Chunk,",chunk_num)
for img in input_chunk:
thresholded_chunk.append(cv2.threshold(img,127,255,cv2.THRESH_BINARY_INV))
return (thresholded_chunk, chunk_num)
if __name__ == '__main__':
start = time()
# Handle command line options
numProcessors = 8
# Start my pool
pool = multiprocessing.Pool(numProcessors)
# Build task list
tasks = []
image_list= []
img_idx = 0
image_pathes = glob.glob("InputSimulation\*.jpg")
image_Q = multiprocessing.Queue(len(image_pathes))
results = []
while img_idx < len(image_pathes):
print("InsterImageNumber",img_idx)
image_list.append(cv2.imread(image_pathes[img_idx],0))
if img_idx % 50 == 0 and img_idx != 0:
print("SetUpChunk",(img_idx / 50))
img_chunk = image_list[int((img_idx / 50)-1):int((img_idx / 50))].copy()
tasks.append( (img_chunk, (img_idx / 50),))
print("Calculate results")
results.append([pool.apply_async( computeFeatures, t ) for t in tasks])
img_idx +=1
# Run tasks #Flatten list before print
end = time()
print("DURATION",end - start)
# Process results
print(results)
for processor in results:
for result in processor:
print(result)
(Image, chunk) = result.get()
cv2.imshow("Thresh",Image)
print("Result: plot %d written to %s" % (Image, chunk) )