Как получить возвращаемое значение из потока в Python?
Как получить значение 'foo'
который возвращается из потока?
from threading import Thread
def foo(bar):
print 'hello {}'.format(bar)
return 'foo'
thread = Thread(target=foo, args=('world!',))
thread.start()
ret = thread.join()
print ret
Один очевидный способ сделать это, показанный выше, возвращает None
,
31 ответ
FWIW, multiprocessing
Модуль имеет хороший интерфейс для этого с помощью Pool
учебный класс. И если вы хотите придерживаться потоков, а не процессов, вы можете просто использовать multiprocessing.pool.ThreadPool
класс в качестве замены.
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
# do some other stuff in the main process
return_val = async_result.get() # get the return value from your function.
Один из способов, которые я видел, - это передать изменяемый объект, такой как список или словарь, в конструктор потока вместе с индексом или другим идентификатором некоторого вида. Затем поток может сохранить свои результаты в своем выделенном слоте в этом объекте. Например:
def foo(bar, result, index):
print 'hello {0}'.format(bar)
result[index] = "foo"
from threading import Thread
threads = [None] * 10
results = [None] * 10
for i in range(len(threads)):
threads[i] = Thread(target=foo, args=('world!', results, i))
threads[i].start()
# do some other stuff
for i in range(len(threads)):
threads[i].join()
print " ".join(results) # what sound does a metasyntactic locomotive make?
Если вы действительно хотите join()
чтобы вернуть возвращаемое значение вызванной функции, вы можете сделать это с помощью Thread
подкласс как следующий:
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print twrv.join() # prints foo
Это становится немного волосатым из-за некоторого искажения имени, и это обращается к "частным" структурам данных, которые являются определенными для Thread
реализация... но это работает.
Для python3
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None
def run(self):
print(type(self._target))
if self._target is not None:
self._return = self._target(*self._args,
**self._kwargs)
def join(self, *args):
Thread.join(self, *args)
return self._return
В Python 3.2+ stdlib concurrent.futures
модуль предоставляет API более высокого уровня для threading
, включая передачу возвращаемых значений или исключений из рабочего потока обратно в основной поток:
import concurrent.futures
def foo(bar):
print('hello {}'.format(bar))
return 'foo'
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(foo, 'world!')
return_value = future.result()
print(return_value)
Ответ Джейка хорош, но если вы не хотите использовать пул потоков (вы не знаете, сколько потоков вам понадобится, но создаете их по мере необходимости), тогда хорошим способом передачи информации между потоками является встроенный Класс Queue.Queue, так как он обеспечивает безопасность потоков.
Я создал следующий декоратор, чтобы он действовал аналогично пулу потоков:
def threaded(f, daemon=False):
import Queue
def wrapped_f(q, *args, **kwargs):
'''this function calls the decorated function and puts the
result in a queue'''
ret = f(*args, **kwargs)
q.put(ret)
def wrap(*args, **kwargs):
'''this is the function returned from the decorator. It fires off
wrapped_f in a new thread and returns the thread object with
the result queue attached'''
q = Queue.Queue()
t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
t.daemon = daemon
t.start()
t.result_queue = q
return t
return wrap
Тогда вы просто используете его как:
@threaded
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Thread object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result_queue.get()
print result
Декорированная функция создает новый поток при каждом вызове и возвращает объект Thread, содержащий очередь, которая получит результат.
ОБНОВИТЬ
Прошло довольно много времени с тех пор, как я опубликовал этот ответ, но он все еще получает представления, поэтому я подумал, что обновлю его, чтобы отразить способ, которым я делаю это в более новых версиях Python:
Python 3.2 добавлен в concurrent.futures
модуль, обеспечивающий высокоуровневый интерфейс для параллельных задач. Это обеспечивает ThreadPoolExecutor
а также ProcessPoolExecutor
, так что вы можете использовать поток или пул процессов с тем же API.
Одним из преимуществ этого API является то, что отправка задачи Executor
возвращает Future
объект, который будет дополнен возвращаемым значением вызываемого вами запроса.
Это делает прикрепление queue
ненужный объект, что немного упрощает декоратор:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)
return wrap
При этом будет использован исполнитель модуля пула потоков по умолчанию, если он не был передан.
Использование очень похоже на ранее:
@threadpool
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Future object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result()
print result
Если вы используете Python 3.4+, одна действительно хорошая особенность использования этого метода (и объектов Future в целом) заключается в том, что возвращаемое будущее можно обернуть, чтобы превратить его в asyncio.Future
с asyncio.wrap_future
, Это позволяет легко работать с сопрограммами:
result = await asyncio.wrap_future(long_task(10))
Если вам не нужен доступ к основному concurrent.Future
Объект, вы можете включить перенос в декоратор:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))
return wrap
Затем, когда вам нужно вытолкнуть интенсивный процессор или блокировать код из потока цикла событий, вы можете поместить его в декорированную функцию:
@threadpool
def some_long_calculation():
...
# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
Другое решение, которое не требует изменения существующего кода:
import Queue
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return 'foo'
que = Queue.Queue()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result
Его также можно легко настроить для многопоточной среды:
import Queue
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return 'foo'
que = Queue.Queue()
threads_list = list()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)
# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...
# Join all the threads
for t in threads_list:
t.join()
# Check thread's return value
while not que.empty():
result = que.get()
print result
Большинство ответов, которые я нашел, являются длинными и требуют знакомства с другими модулями или расширенными функциями Python и могут сбить с толку кого-то, если он еще не знаком со всем, о чем говорится в ответе.
Рабочий код для упрощенного подхода:
import threading, time, random
class ThreadWithResult(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
def function():
self.result = target(*args, **kwargs)
super().__init__(group=group, target=function, name=name, daemon=daemon)
def function_to_thread(n):
count = 0
while count < 3:
print(f'still running thread {n}')
count +=1
time.sleep(3)
result = random.random()
print(f'Return value of thread {n} should be: {result}')
return result
def main():
thread1 = ThreadWithResult(target=function_to_thread, args=(1,))
thread2 = ThreadWithResult(target=function_to_thread, args=(2,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(thread1.result)
print(thread2.result)
main()
Объяснение: Я хотел значительно упростить вещи, поэтому я создал класс и унаследовал от него. Вложенная функция
function
в
__init__
вызывает поточную функцию, значение которой мы хотим сохранить, и сохраняет результат как атрибут экземпляра
self.result
после того, как поток завершит выполнение.
Создание этого экземпляра идентично созданию экземпляра. Передайте функцию, которую хотите запустить в новом потоке, в
target
аргумент и любые аргументы, которые могут понадобиться вашей функции для
args
аргумент и любые аргументы ключевого слова для
kwargs
аргумент.
например
my_thread = ThreadWithResult(target=my_function, args=(arg1, arg2, arg3))
Я думаю, что это значительно легче понять, чем подавляющее большинство ответов, и этот подход не требует дополнительного импорта! Я включил
time
и
random
модуль для имитации поведения потока, но они не требуются для достижения функциональности, заданной в исходном вопросе .
Я знаю, что отвечу на этот длинный вопрос после того, как вопрос был задан, но я надеюсь, что это поможет большему количеству людей в будущем!
РЕДАКТИРОВАТЬ : я создал
<tcode id="4190888"></tcode>Пакет PyPI, чтобы вы могли получить доступ к тому же коду, указанному выше, и повторно использовать его в проектах ( код GitHub находится здесь ). Пакет PyPI полностью расширяет
threading.Thread
class, поэтому вы можете установить любые атрибуты, которые вы бы установили на
threading.thread
на
ThreadWithResult
класс тоже!
Исходный ответ выше затрагивает основную идею этого подкласса, но для получения дополнительной информации см. Более подробное объяснение (из строки документации модуля) здесь .
Пример быстрого использования:
pip3 install -U save-thread-result # MacOS/Linux
pip install -U save-thread-result # Windows
python3 # MacOS/Linux
python # Windows
from save_thread_result import ThreadWithResult
# As of Release 0.0.3, you can also specify values for
#`group`, `name`, and `daemon` if you want to set those
# values manually.
thread = ThreadWithResult(
target = my_function,
args = (my_function_arg1, my_function_arg2, ...)
kwargs = (my_function_kwarg1=kwarg1_value, my_function_kwarg2=kwarg2_value, ...)
)
thread.start()
thread.join()
if getattr(thread, 'result', None):
print(thread.result)
else:
# thread.result attribute not set - something caused
# the thread to terminate BEFORE the thread finished
# executing the function passed in through the
# `target` argument
print('ERROR! Something went wrong while executing this thread, and the function you passed in did NOT complete!!')
# seeing help about the class and information about the threading.Thread super class methods and attributes available:
help(ThreadWithResult)
Parris / ответ kindalljoin
/return
ответ портирован на Python 3:
from threading import Thread
def foo(bar):
print('hello {0}'.format(bar))
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print(twrv.join()) # prints foo
Обратите внимание Thread
Класс реализован по-разному в Python 3.
Я украл ответ Уиндола и немного его почистил.
Ключевая часть добавляет *args и **kwargs в join() для обработки тайм-аута
class threadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super(threadWithReturn, self).__init__(*args, **kwargs)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
def join(self, *args, **kwargs):
super(threadWithReturn, self).join(*args, **kwargs)
return self._return
ОБНОВЛЕНИЕ ОТВЕТА НИЖЕ
Это мой самый популярный ответ, поэтому я решил обновить код, который будет работать как на py2, так и на py3.
Кроме того, я вижу много ответов на этот вопрос, которые показывают отсутствие понимания в отношении Thread.join(). Некоторые совершенно не справляются с timeout
Arg. Но есть также угловой случай, о котором вы должны знать в отношении случаев, когда у вас есть (1) целевая функция, которая может возвращать None
и (2) вы также проходите timeout
arg to join(). Пожалуйста, смотрите "Тест 4", чтобы понять этот угловой случай.
Класс ThreadWithReturn, который работает с py2 и py3:
import sys
from threading import Thread
from builtins import super # https://stackru.com/a/30159479
if sys.version_info >= (3, 0):
_thread_target_key = '_target'
_thread_args_key = '_args'
_thread_kwargs_key = '_kwargs'
else:
_thread_target_key = '_Thread__target'
_thread_args_key = '_Thread__args'
_thread_kwargs_key = '_Thread__kwargs'
class ThreadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._return = None
def run(self):
target = getattr(self, _thread_target_key)
if not target is None:
self._return = target(*getattr(self, _thread_args_key), **getattr(self, _thread_kwargs_key))
def join(self, *args, **kwargs):
super().join(*args, **kwargs)
return self._return
Некоторые примеры тестов приведены ниже:
import time, random
# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
if not seconds is None:
time.sleep(seconds)
return arg
# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')
# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)
# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished
# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
Можете ли вы определить угловой случай, с которым мы можем столкнуться с ТЕСТОМ 4?
Проблема в том, что мы ожидаем, что метод giveMe() вернет None (см. ТЕСТ 2), но мы также ожидаем, что join() вернет None, если время ожидания истекло.
returned is None
означает либо:
(1) это то, что вернул GiveMe(), или
(2) истекло время соединения ()
Этот пример тривиален, так как мы знаем, что метод giveMe() всегда будет возвращать None. Но в реальном случае (когда цель может законно вернуть None или что-то еще), мы бы хотели явно проверить, что произошло.
Ниже описано, как решить этот угловой случай:
# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
if my_thread.isAlive():
# returned is None because join() timed out
# this also means that giveMe() is still running in the background
pass
# handle this based on your app's logic
else:
# join() is finished, and so is giveMe()
# BUT we could also be in a race condition, so we need to update returned, just in case
returned = my_thread.join()
Использование очереди:
import threading, queue
def calc_square(num, out_queue1):
l = []
for x in num:
l.append(x*x)
out_queue1.put(l)
arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
Я нашел самый короткий и простой способ сделать это — воспользоваться преимуществами классов Python и их динамических свойств. Вы можете получить текущий поток из контекста вашего порожденного потока, используяthreading.current_thread()
и присвойте возвращаемое значение свойству.
import threading
def some_target_function():
# Your code here.
threading.current_thread().return_value = "Some return value."
your_thread = threading.Thread(target=some_target_function)
your_thread.start()
your_thread.join()
return_value = your_thread.return_value
print(return_value)
Мое решение проблемы - обернуть функцию и поток в классе. Не требует использования пулов, очередей или передачи переменных типа c. Это также не блокирует. Вместо этого вы проверяете статус. Смотрите пример того, как использовать его в конце кода.
import threading
class ThreadWorker():
'''
The basic idea is given a function create an object.
The object can then run the function in a thread.
It provides a wrapper to start it,check its status,and get data out the function.
'''
def __init__(self,func):
self.thread = None
self.data = None
self.func = self.save_data(func)
def save_data(self,func):
'''modify function to save its returned data'''
def new_func(*args, **kwargs):
self.data=func(*args, **kwargs)
return new_func
def start(self,params):
self.data = None
if self.thread is not None:
if self.thread.isAlive():
return 'running' #could raise exception here
#unless thread exists and is alive start or restart it
self.thread = threading.Thread(target=self.func,args=params)
self.thread.start()
return 'started'
def status(self):
if self.thread is None:
return 'not_started'
else:
if self.thread.isAlive():
return 'running'
else:
return 'finished'
def get_results(self):
if self.thread is None:
return 'not_started' #could return exception
else:
if self.thread.isAlive():
return 'running'
else:
return self.data
def add(x,y):
return x +y
add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
Принимая во внимание комментарий @iman к ответу @JakeBiesinger, я предложил, чтобы он имел различное количество потоков:
from multiprocessing.pool import ThreadPool
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
numOfThreads = 3
results = []
pool = ThreadPool(numOfThreads)
for i in range(0, numOfThreads):
results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)
# do some other stuff in the main process
# ...
# ...
results = [r.get() for r in results]
print results
pool.close()
pool.join()
Ура,
Guy.
Основываясь на том, что уже упоминалось, вот более общее решение, которое работает с Python3.
import threading
class ThreadWithReturnValue(threading.Thread):
def __init__(self, *init_args, **init_kwargs):
threading.Thread.__init__(self, *init_args, **init_kwargs)
self._return = None
def run(self):
self._return = self._target(*self._args, **self._kwargs)
def join(self):
threading.Thread.join(self)
return self._return
Применение
th = ThreadWithReturnValue(target=requests.get, args=('http://www.google.com',))
th.start()
response = th.join()
response.status_code # => 200
Я использую эту обертку, которая удобно превращает любую функцию для запуска в Thread
- заботиться о его возвращаемом значении или исключении. Не добавляет Queue
накладные расходы.
def threading_func(f):
"""Decorator for running a function in a thread and handling its return
value or exception"""
def start(*args, **kw):
def run():
try:
th.ret = f(*args, **kw)
except:
th.exc = sys.exc_info()
def get(timeout=None):
th.join(timeout)
if th.exc:
raise th.exc[0], th.exc[1], th.exc[2] # py2
##raise th.exc[1] #py3
return th.ret
th = threading.Thread(None, run)
th.exc = None
th.get = get
th.start()
return th
return start
Примеры использования
def f(x):
return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))
@threading_func
def th_mul(a, b):
return a * b
th = th_mul("text", 2.5)
try:
print(th.get())
except TypeError:
print("exception thrown ok.")
Примечания о threading
модуль
Удобное возвращаемое значение и обработка исключений для многопоточной функции - это частая "Pythonic" необходимость, и она действительно должна уже предлагаться threading
модуль - возможно прямо в стандарте Thread
учебный класс. ThreadPool
имеет слишком много накладных расходов для простых задач - 3 управления потоками, много бюрократии. к несчастью Thread
Первоначально макет был скопирован с Java - что вы видите, например, из все еще бесполезного первого (!) параметра конструктора group
,
join
всегда возвращайся None
Я думаю, вы должны подкласс Thread
обрабатывать коды возврата и так.
Вы можете определить изменяемую область выше области действия многопоточной функции и добавить к ней результат. (Я также изменил код для совместимости с python3)
returns = {}
def foo(bar):
print('hello {0}'.format(bar))
returns[bar] = 'foo'
from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)
Это возвращает {'world!': 'foo'}
Если вы используете функцию ввода в качестве ключа к вашим результатам, каждый уникальный вход гарантированно даст запись в результатах.
Это довольно старый вопрос, но я хотел поделиться простым решением, которое сработало для меня и помогло моему процессу разработки.
Методология этого ответа заключается в том, что «новая» целевая функция,inner
присвоение результата исходной функции (прошедшей через__init__
функция) кresult
атрибут экземпляра оболочки через нечто, называемое закрытием.
Это позволяет классу-оболочке удерживать возвращаемое значение для доступа вызывающих объектов в любое время.
ПРИМЕЧАНИЕ. Этот метод не требует использования каких-либо искаженных методов или частных методовthreading.Thread
класс, хотя функции доходности не рассматривались (OP не упоминал функции доходности).
Наслаждаться!
from threading import Thread as _Thread
class ThreadWrapper:
def __init__(self, target, *args, **kwargs):
self.result = None
self._target = self._build_threaded_fn(target)
self.thread = _Thread(
target=self._target,
*args,
**kwargs
)
def _build_threaded_fn(self, func):
def inner(*args, **kwargs):
self.result = func(*args, **kwargs)
return inner
Кроме того, вы можете запуститьpytest
(при условии, что он у вас установлен) со следующим кодом для демонстрации результатов:
import time
from commons import ThreadWrapper
def test():
def target():
time.sleep(1)
return 'Hello'
wrapper = ThreadWrapper(target=target)
wrapper.thread.start()
r = wrapper.result
assert r is None
time.sleep(2)
r = wrapper.result
assert r == 'Hello'
Вы можете использовать
pool.apply_async()
из
ThreadPool()
вернуть значение изкак показано ниже:
from multiprocessing.pool import ThreadPool
def test(num1, num2):
return num1 + num2
pool = ThreadPool(processes=1) # Here
result = pool.apply_async(test, (2, 3)) # Here
print(result.get()) # 5
И вы также можете использовать
submit()
из
concurrent.futures.ThreadPoolExecutor()
вернуть значение из
test()
как показано ниже:
from concurrent.futures import ThreadPoolExecutor
def test(num1, num2):
return num1 + num2
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test, 2, 3) # Here
print(future.result()) # 5
И, вместо, вы можете использовать массив , как показано ниже:
from threading import Thread
def test(num1, num2, r):
r[0] = num1 + num2 # Instead of "return"
result = [None] # Here
thread = Thread(target=test, args=(2, 3, result))
thread.start()
thread.join()
print(result[0]) # 5
И вместо
return
, вы также можете использовать очередь
result
как показано ниже:
from threading import Thread
import queue
def test(num1, num2, q):
q.put(num1 + num2) # Instead of "return"
queue = queue.Queue() # Here
thread = Thread(target=test, args=(2, 3, queue))
thread.start()
thread.join()
print(queue.get()) # '5'
Идея GuySoft великолепна, но я думаю, что объект не обязательно должен наследоваться от Thread, и start() может быть удален из интерфейса:
from threading import Thread
import queue
class ThreadWithReturnValue(object):
def __init__(self, target=None, args=(), **kwargs):
self._que = queue.Queue()
self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
args=(self._que, args, kwargs), )
self._t.start()
def join(self):
self._t.join()
return self._que.get()
def foo(bar):
print('hello {0}'.format(bar))
return "foo"
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
print(twrv.join()) # prints foo
Вы можете использовать Пул в качестве пула рабочих процессов, как показано ниже:
from multiprocessing import Pool
def f1(x, y):
return x*y
if __name__ == '__main__':
with Pool(processes=10) as pool:
result = pool.apply(f1, (2, 3))
print(result)
Определите вашу цель для
1) принять аргумент q
2) заменить любые заявления return foo
с q.put(foo); return
так что функция
def func(a):
ans = a * a
return ans
станет
def func(a, q):
ans = a * a
q.put(ans)
return
и тогда вы будете действовать как таковой
from Queue import Queue
from threading import Thread
ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]
threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]
И вы можете использовать функции декораторы / оболочки, чтобы сделать это, чтобы вы могли использовать ваши существующие функции как target
не изменяя их, но следуйте этой базовой схеме.
Вот версия, которую я создал из ответа @Kindall /questions/29890751/kak-poluchit-vozvraschaemoe-znachenie-iz-potoka-v-python/29890764#29890764
В этой версии все, что вам нужно сделать, это ввести команду с аргументами для создания нового потока.
(Я также включил пару тестов)
это было сделано с помощью python 3.8
from threading import Thread
from typing import Any
#def threader(com): # my original Version (Ignore this)
# try:
# threader = Thread(target = com)
# threader.start()
# except Exception as e:
# print(e)
# print('Could not start thread')
def test(plug, plug2, plug3):
print(f"hello {plug}")
print(f'I am the second plug : {plug2}')
print(plug3)
return 'I am the return Value!'
def test2(msg):
return f'I am from the second test: {msg}'
def test3():
print('hello world')
def NewThread(com, Returning: bool, *arguments) -> Any:
"""
Will create a new thread for a function/command.
:param com: Command to be Executed
:param arguments: Arguments to be sent to Command
:param Returning: True/False Will this command need to return anything
"""
class NewThreadWorker(Thread):
def __init__(self, group = None, target = None, name = None, args = (), kwargs = None, *,
daemon = None):
Thread.__init__(self, group, target, name, args, kwargs, daemon = daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
Thread.join(self)
return self._return
ntw = NewThreadWorker(target = com, args = (*arguments,))
ntw.start()
if Returning:
return ntw.join()
if __name__ == "__main__":
# threader(test('world'))
print(NewThread(test, True, 'hi', 'test', test2('hi')))
NewThread(test3, True)
Надеюсь, что это кому-то пригодится:)
Как уже упоминалось, многопроцессорный пул намного медленнее, чем базовые потоки. Использование очередей, предложенных в некоторых ответах, является очень эффективной альтернативой. Я использую его со словарями, чтобы иметь возможность запускать множество небольших потоков и восстанавливать множественные ответы, комбинируя их со словарями:
#!/usr/bin/env python3
import threading
# use Queue for python2
import queue
import random
LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]
NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
def randoms(k, q):
result = dict()
result['letter'] = random.choice(LETTERS)
result['number'] = random.choice(NUMBERS)
q.put({k: result})
threads = list()
q = queue.Queue()
results = dict()
for name in ('alpha', 'oscar', 'yankee',):
threads.append( threading.Thread(target=randoms, args=(name, q)) )
threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
results.update(q.get())
print(results)
Ответ Киндалла в Python3
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon)
self._return = None
def run(self):
try:
if self._target:
self._return = self._target(*self._args, **self._kwargs)
finally:
del self._target, self._args, self._kwargs
def join(self,timeout=None):
Thread.join(self,timeout)
return self._return
Если из вызова функции нужно проверить только True или False, я нашел бы более простое решение - обновить глобальный список.
import threading
lists = {"A":"True", "B":"True"}
def myfunc(name: str, mylist):
for i in mylist:
if i == 31:
lists[name] = "False"
return False
else:
print("name {} : {}".format(name, i))
t1 = threading.Thread(target=myfunc, args=("A", [1, 2, 3, 4, 5, 6], ))
t2 = threading.Thread(target=myfunc, args=("B", [11, 21, 31, 41, 51, 61], ))
t1.start()
t2.start()
t1.join()
t2.join()
for value in lists.values():
if value == False:
# Something is suspicious
# Take necessary action
Это более полезно, если вы хотите узнать, вернул ли какой-либо из потоков ложное состояние, чтобы предпринять необходимые действия.
Я не знаю, сработало ли это для вас, ребята, но я решил создать глобальный объект (в основном словари или вложенные массивы), чтобы функция могла получить доступ к объекту и изменить его. Я знаю, что это требует больше ресурсов, но мы не Имея дело с квантовой наукой, я думаю, мы можем дать немного больше оперативной памяти при условии, что потребление оперативной памяти увеличивается линейно с загрузкой процессора. Вот пример кода:
import requests
import json
import string
import random
import threading
import time
dictionary = {}
def get_val1(L):
print('#1')
for n,elem in enumerate(L):
dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val2(L):
print('#2')
for n,elem in enumerate(L):
dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val3(L):
print('#3')
for n,elem in enumerate(L):
dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val4(L):
print('#4')
for n,elem in enumerate(L):
dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
t1 = threading.Thread(target=get_val1,args=(L[0],))
t2 = threading.Thread(target=get_val2,args=(L[1],))
t3 = threading.Thread(target=get_val3,args=(L[2],))
t4 = threading.Thread(target=get_val4,args=(L[3],))
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
Эта программа выполняет 4 потока, каждый из которых возвращает некоторые данные для некоторого текста L[i] для i в L, возвращаемые данные из API сохраняются в словаре. Они могут варьироваться от программы к программе, выгодно это или нет, для небольших Для среднетяжелых вычислительных задач эта мутация объекта работает довольно быстро и использует на несколько процентов больше ресурсов.
Одно из обычных решений - обернуть вашу функцию foo
с декоратором как
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
Тогда весь код может выглядеть так
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]
for t in threads:
t.start()
while(True):
if(len(threading.enumerate()) < max_num):
break
for t in threads:
t.join()
return result
Заметка
Одна важная проблема заключается в том, что возвращаемые значения могут быть неупорядоченными.
(На самом деле, return value
не обязательно сохраняется в queue
, так как вы можете выбрать произвольную потокобезопасную структуру данных)
Очень простой способ сделать это для таких чайников, как я:
import queue
import threading
# creating queue instance
q = queue.Queue()
# creating threading class
class AnyThread():
def __init__ (self):
threading.Thread.__init__(self)
def run(self):
# in this class and function we will put our test target function
test()
t = AnyThread()
# having our test target function
def test():
# do something in this function:
result = 3 + 2
# and put result to a queue instance
q.put(result)
for i in range(3): #calling our threading fucntion 3 times (just for example)
t.run()
output = q.get() # here we get output from queue instance
print(output)
>>> 5
>>> 5
>>> 5
Главное здесь - это queue
модуль. Мы создаем queue.Queue()
экземпляр и включить его в нашу функцию. Мы кормим его своим результатом, который позже мы выходим за пределы потока.
Посмотрите еще один пример с аргументами, переданными нашей тестовой функции:
import queue
import threading
# creating queue instance
q = queue.Queue()
# creating threading class
class AnyThread():
def __init__ (self):
threading.Thread.__init__(self)
def run(self, a, b):
# in this class and function we will put our execution test function
test(a, b)
t = AnyThread()
# having our test target function
def test(a, b):
# do something in this function:
result = a + b
# and put result to a queue instance
q.put(result)
for i in range(3): #calling our threading fucntion 3 times (just for example)
t.run(3+i, 2+i)
output = q.get() # here we get output from queue instance
print(output)
>>> 5
>>> 7
>>> 9
Почему бы просто не использовать глобальную переменную?
import threading
class myThread(threading.Thread):
def __init__(self, ind, lock):
threading.Thread.__init__(self)
self.ind = ind
self.lock = lock
def run(self):
global results
with self.lock:
results.append(self.ind)
results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(results)
Я знаю, что эта ветка устарела.... но я столкнулся с той же проблемой... Если вы хотите использовать
thread.join()
import threading
class test:
def __init__(self):
self.msg=""
def hello(self,bar):
print('hello {}'.format(bar))
self.msg="foo"
def main(self):
thread = threading.Thread(target=self.hello, args=('world!',))
thread.start()
thread.join()
print(self.msg)
g=test()
g.main()