Как получить возвращаемое значение из потока в Python?

Как получить значение 'foo' который возвращается из потока?

from threading import Thread

def foo(bar):
    print 'hello {}'.format(bar)
    return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
ret = thread.join()
print ret

Один очевидный способ сделать это, показанный выше, возвращает None,

31 ответ

Решение

FWIW, multiprocessing Модуль имеет хороший интерфейс для этого с помощью Pool учебный класс. И если вы хотите придерживаться потоков, а не процессов, вы можете просто использовать multiprocessing.pool.ThreadPool класс в качестве замены.

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.

Один из способов, которые я видел, - это передать изменяемый объект, такой как список или словарь, в конструктор потока вместе с индексом или другим идентификатором некоторого вида. Затем поток может сохранить свои результаты в своем выделенном слоте в этом объекте. Например:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

Если вы действительно хотите join() чтобы вернуть возвращаемое значение вызванной функции, вы можете сделать это с помощью Thread подкласс как следующий:

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

Это становится немного волосатым из-за некоторого искажения имени, и это обращается к "частным" структурам данных, которые являются определенными для Thread реализация... но это работает.

Для python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None
    def run(self):
        print(type(self._target))
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return

В Python 3.2+ stdlib concurrent.futures модуль предоставляет API более высокого уровня для threading, включая передачу возвращаемых значений или исключений из рабочего потока обратно в основной поток:

import concurrent.futures

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(foo, 'world!')
    return_value = future.result()
    print(return_value)

Ответ Джейка хорош, но если вы не хотите использовать пул потоков (вы не знаете, сколько потоков вам понадобится, но создаете их по мере необходимости), тогда хорошим способом передачи информации между потоками является встроенный Класс Queue.Queue, так как он обеспечивает безопасность потоков.

Я создал следующий декоратор, чтобы он действовал аналогично пулу потоков:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

Тогда вы просто используете его как:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

Декорированная функция создает новый поток при каждом вызове и возвращает объект Thread, содержащий очередь, которая получит результат.

ОБНОВИТЬ

Прошло довольно много времени с тех пор, как я опубликовал этот ответ, но он все еще получает представления, поэтому я подумал, что обновлю его, чтобы отразить способ, которым я делаю это в более новых версиях Python:

Python 3.2 добавлен в concurrent.futures модуль, обеспечивающий высокоуровневый интерфейс для параллельных задач. Это обеспечивает ThreadPoolExecutor а также ProcessPoolExecutor, так что вы можете использовать поток или пул процессов с тем же API.

Одним из преимуществ этого API является то, что отправка задачи Executor возвращает Future объект, который будет дополнен возвращаемым значением вызываемого вами запроса.

Это делает прикрепление queue ненужный объект, что немного упрощает декоратор:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

При этом будет использован исполнитель модуля пула потоков по умолчанию, если он не был передан.

Использование очень похоже на ранее:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

Если вы используете Python 3.4+, одна действительно хорошая особенность использования этого метода (и объектов Future в целом) заключается в том, что возвращаемое будущее можно обернуть, чтобы превратить его в asyncio.Future с asyncio.wrap_future, Это позволяет легко работать с сопрограммами:

result = await asyncio.wrap_future(long_task(10))

Если вам не нужен доступ к основному concurrent.Future Объект, вы можете включить перенос в декоратор:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

Затем, когда вам нужно вытолкнуть интенсивный процессор или блокировать код из потока цикла событий, вы можете поместить его в декорированную функцию:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()

Другое решение, которое не требует изменения существующего кода:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result

Его также можно легко настроить для многопоточной среды:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()
threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result

Большинство ответов, которые я нашел, являются длинными и требуют знакомства с другими модулями или расширенными функциями Python и могут сбить с толку кого-то, если он еще не знаком со всем, о чем говорится в ответе.

Рабочий код для упрощенного подхода:

      import threading, time, random

class ThreadWithResult(threading.Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
        def function():
            self.result = target(*args, **kwargs)
        super().__init__(group=group, target=function, name=name, daemon=daemon)

def function_to_thread(n):
    count = 0
    while count < 3:
            print(f'still running thread {n}')
            count +=1
            time.sleep(3)
    result = random.random()
    print(f'Return value of thread {n} should be: {result}')
    return result


def main():
    thread1 = ThreadWithResult(target=function_to_thread, args=(1,))
    thread2 = ThreadWithResult(target=function_to_thread, args=(2,))
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print(thread1.result)
    print(thread2.result)

main()

Объяснение: Я хотел значительно упростить вещи, поэтому я создал класс и унаследовал от него. Вложенная функция function в __init__ вызывает поточную функцию, значение которой мы хотим сохранить, и сохраняет результат как атрибут экземпляра self.result после того, как поток завершит выполнение.

Создание этого экземпляра идентично созданию экземпляра. Передайте функцию, которую хотите запустить в новом потоке, в target аргумент и любые аргументы, которые могут понадобиться вашей функции для args аргумент и любые аргументы ключевого слова для kwargs аргумент.

например

      my_thread = ThreadWithResult(target=my_function, args=(arg1, arg2, arg3))

Я думаю, что это значительно легче понять, чем подавляющее большинство ответов, и этот подход не требует дополнительного импорта! Я включил time и randomмодуль для имитации поведения потока, но они не требуются для достижения функциональности, заданной в исходном вопросе .

Я знаю, что отвечу на этот длинный вопрос после того, как вопрос был задан, но я надеюсь, что это поможет большему количеству людей в будущем!


РЕДАКТИРОВАТЬ : я создал <tcode id="4190888"></tcode>Пакет PyPI, чтобы вы могли получить доступ к тому же коду, указанному выше, и повторно использовать его в проектах ( код GitHub находится здесь ). Пакет PyPI полностью расширяет threading.Thread class, поэтому вы можете установить любые атрибуты, которые вы бы установили на threading.thread на ThreadWithResult класс тоже!

Исходный ответ выше затрагивает основную идею этого подкласса, но для получения дополнительной информации см. Более подробное объяснение (из строки документации модуля) здесь .

Пример быстрого использования:

      pip3 install -U save-thread-result     # MacOS/Linux
pip  install -U save-thread-result     # Windows

python3     # MacOS/Linux
python      # Windows
      from save_thread_result import ThreadWithResult

# As of Release 0.0.3, you can also specify values for
#`group`, `name`, and `daemon` if you want to set those
# values manually.
thread = ThreadWithResult(
    target = my_function,
    args   = (my_function_arg1, my_function_arg2, ...)
    kwargs = (my_function_kwarg1=kwarg1_value, my_function_kwarg2=kwarg2_value, ...)
)

thread.start()
thread.join()
if getattr(thread, 'result', None):
    print(thread.result)
else:
    # thread.result attribute not set - something caused
    # the thread to terminate BEFORE the thread finished
    # executing the function passed in through the
    # `target` argument
    print('ERROR! Something went wrong while executing this thread, and the function you passed in did NOT complete!!')

# seeing help about the class and information about the threading.Thread super class methods and attributes available:
help(ThreadWithResult)

Parris / ответ kindalljoin/return ответ портирован на Python 3:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

Обратите внимание Thread Класс реализован по-разному в Python 3.

Я украл ответ Уиндола и немного его почистил.

Ключевая часть добавляет *args и **kwargs в join() для обработки тайм-аута

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)

        self._return = None

    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)

        return self._return

ОБНОВЛЕНИЕ ОТВЕТА НИЖЕ

Это мой самый популярный ответ, поэтому я решил обновить код, который будет работать как на py2, так и на py3.

Кроме того, я вижу много ответов на этот вопрос, которые показывают отсутствие понимания в отношении Thread.join(). Некоторые совершенно не справляются с timeout Arg. Но есть также угловой случай, о котором вы должны знать в отношении случаев, когда у вас есть (1) целевая функция, которая может возвращать None и (2) вы также проходите timeout arg to join(). Пожалуйста, смотрите "Тест 4", чтобы понять этот угловой случай.

Класс ThreadWithReturn, который работает с py2 и py3:

import sys
from threading import Thread
from builtins import super    # https://stackru.com/a/30159479

if sys.version_info >= (3, 0):
    _thread_target_key = '_target'
    _thread_args_key = '_args'
    _thread_kwargs_key = '_kwargs'
else:
    _thread_target_key = '_Thread__target'
    _thread_args_key = '_Thread__args'
    _thread_kwargs_key = '_Thread__kwargs'

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None

    def run(self):
        target = getattr(self, _thread_target_key)
        if not target is None:
            self._return = target(*getattr(self, _thread_args_key), **getattr(self, _thread_kwargs_key))

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

Некоторые примеры тестов приведены ниже:

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

Можете ли вы определить угловой случай, с которым мы можем столкнуться с ТЕСТОМ 4?

Проблема в том, что мы ожидаем, что метод giveMe() вернет None (см. ТЕСТ 2), но мы также ожидаем, что join() вернет None, если время ожидания истекло.

returned is None означает либо:

(1) это то, что вернул GiveMe(), или

(2) истекло время соединения ()

Этот пример тривиален, так как мы знаем, что метод giveMe() всегда будет возвращать None. Но в реальном случае (когда цель может законно вернуть None или что-то еще), мы бы хотели явно проверить, что произошло.

Ниже описано, как решить этот угловой случай:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()

Использование очереди:

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())

Я нашел самый короткий и простой способ сделать это — воспользоваться преимуществами классов Python и их динамических свойств. Вы можете получить текущий поток из контекста вашего порожденного потока, используяthreading.current_thread()и присвойте возвращаемое значение свойству.

      import threading

def some_target_function():
    # Your code here.
    threading.current_thread().return_value = "Some return value."

your_thread = threading.Thread(target=some_target_function)
your_thread.start()
your_thread.join()

return_value = your_thread.return_value
print(return_value)

Мое решение проблемы - обернуть функцию и поток в классе. Не требует использования пулов, очередей или передачи переменных типа c. Это также не блокирует. Вместо этого вы проверяете статус. Смотрите пример того, как использовать его в конце кода.

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()

Принимая во внимание комментарий @iman к ответу @JakeBiesinger, я предложил, чтобы он имел различное количество потоков:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

Ура,

Guy.

Основываясь на том, что уже упоминалось, вот более общее решение, которое работает с Python3.

import threading

class ThreadWithReturnValue(threading.Thread):
    def __init__(self, *init_args, **init_kwargs):
        threading.Thread.__init__(self, *init_args, **init_kwargs)
        self._return = None
    def run(self):
        self._return = self._target(*self._args, **self._kwargs)
    def join(self):
        threading.Thread.join(self)
        return self._return

Применение

        th = ThreadWithReturnValue(target=requests.get, args=('http://www.google.com',))
        th.start()
        response = th.join()
        response.status_code  # => 200

Я использую эту обертку, которая удобно превращает любую функцию для запуска в Thread - заботиться о его возвращаемом значении или исключении. Не добавляет Queue накладные расходы.

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

Примеры использования

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

Примечания о threading модуль

Удобное возвращаемое значение и обработка исключений для многопоточной функции - это частая "Pythonic" необходимость, и она действительно должна уже предлагаться threading модуль - возможно прямо в стандарте Thread учебный класс. ThreadPool имеет слишком много накладных расходов для простых задач - 3 управления потоками, много бюрократии. к несчастью ThreadПервоначально макет был скопирован с Java - что вы видите, например, из все еще бесполезного первого (!) параметра конструктора group,

join всегда возвращайся NoneЯ думаю, вы должны подкласс Thread обрабатывать коды возврата и так.

Вы можете определить изменяемую область выше области действия многопоточной функции и добавить к ней результат. (Я также изменил код для совместимости с python3)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

Это возвращает {'world!': 'foo'}

Если вы используете функцию ввода в качестве ключа к вашим результатам, каждый уникальный вход гарантированно даст запись в результатах.

Это довольно старый вопрос, но я хотел поделиться простым решением, которое сработало для меня и помогло моему процессу разработки.

Методология этого ответа заключается в том, что «новая» целевая функция,innerприсвоение результата исходной функции (прошедшей через__init__функция) кresultатрибут экземпляра оболочки через нечто, называемое закрытием.

Это позволяет классу-оболочке удерживать возвращаемое значение для доступа вызывающих объектов в любое время.

ПРИМЕЧАНИЕ. Этот метод не требует использования каких-либо искаженных методов или частных методовthreading.Threadкласс, хотя функции доходности не рассматривались (OP не упоминал функции доходности).

Наслаждаться!

      from threading import Thread as _Thread


class ThreadWrapper:
    def __init__(self, target, *args, **kwargs):
        self.result = None
        self._target = self._build_threaded_fn(target)
        self.thread = _Thread(
            target=self._target,
            *args,
            **kwargs
        )

    def _build_threaded_fn(self, func):
        def inner(*args, **kwargs):
            self.result = func(*args, **kwargs)
        return inner

Кроме того, вы можете запуститьpytest(при условии, что он у вас установлен) со следующим кодом для демонстрации результатов:

      import time
from commons import ThreadWrapper


def test():

    def target():
        time.sleep(1)
        return 'Hello'

    wrapper = ThreadWrapper(target=target)
    wrapper.thread.start()

    r = wrapper.result
    assert r is None

    time.sleep(2)

    r = wrapper.result
    assert r == 'Hello'

Вы можете использовать pool.apply_async()из ThreadPool()вернуть значение изкак показано ниже:

      from multiprocessing.pool import ThreadPool

def test(num1, num2):
    return num1 + num2

pool = ThreadPool(processes=1) # Here
result = pool.apply_async(test, (2, 3)) # Here
print(result.get()) # 5

И вы также можете использовать submit()из concurrent.futures.ThreadPoolExecutor()вернуть значение из test()как показано ниже:

      from concurrent.futures import ThreadPoolExecutor

def test(num1, num2):
    return num1 + num2

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(test, 2, 3) # Here
print(future.result()) # 5

И, вместо, вы можете использовать массив , как показано ниже:

      from threading import Thread

def test(num1, num2, r):
    r[0] = num1 + num2 # Instead of "return"

result = [None] # Here

thread = Thread(target=test, args=(2, 3, result))
thread.start()
thread.join()
print(result[0]) # 5

И вместо return, вы также можете использовать очередь resultкак показано ниже:

      from threading import Thread
import queue

def test(num1, num2, q):
    q.put(num1 + num2) # Instead of "return" 

queue = queue.Queue() # Here

thread = Thread(target=test, args=(2, 3, queue))
thread.start()
thread.join()
print(queue.get()) # '5'

Идея GuySoft великолепна, но я думаю, что объект не обязательно должен наследоваться от Thread, и start() может быть удален из интерфейса:

from threading import Thread
import queue
class ThreadWithReturnValue(object):
    def __init__(self, target=None, args=(), **kwargs):
        self._que = queue.Queue()
        self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
                args=(self._que, args, kwargs), )
        self._t.start()

    def join(self):
        self._t.join()
        return self._que.get()


def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

print(twrv.join())   # prints foo

Вы можете использовать Пул в качестве пула рабочих процессов, как показано ниже:

from multiprocessing import Pool


def f1(x, y):
    return x*y


if __name__ == '__main__':
    with Pool(processes=10) as pool:
        result = pool.apply(f1, (2, 3))
        print(result)

Определите вашу цель для
1) принять аргумент q
2) заменить любые заявления return foo с q.put(foo); return

так что функция

def func(a):
    ans = a * a
    return ans

станет

def func(a, q):
    ans = a * a
    q.put(ans)
    return

и тогда вы будете действовать как таковой

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

И вы можете использовать функции декораторы / оболочки, чтобы сделать это, чтобы вы могли использовать ваши существующие функции как target не изменяя их, но следуйте этой базовой схеме.

Вот версия, которую я создал из ответа @Kindall /questions/29890751/kak-poluchit-vozvraschaemoe-znachenie-iz-potoka-v-python/29890764#29890764

В этой версии все, что вам нужно сделать, это ввести команду с аргументами для создания нового потока.

(Я также включил пару тестов)

это было сделано с помощью python 3.8


from threading import Thread
from typing import Any


#def threader(com):  # my original Version (Ignore this)
#    try:
#        threader = Thread(target = com)
#        threader.start()
#    except Exception as e:
#        print(e)
#        print('Could not start thread')


def test(plug, plug2, plug3):
    print(f"hello {plug}")
    print(f'I am the second plug : {plug2}')
    print(plug3)
    return 'I am the return Value!'


def test2(msg):
    return f'I am from the second test: {msg}'


def test3():
    print('hello world')


def NewThread(com, Returning: bool, *arguments) -> Any:
    """
    Will create a new thread for a function/command.

    :param com: Command to be Executed
    :param arguments: Arguments to be sent to Command
    :param Returning: True/False Will this command need to return anything
    """
    
    class NewThreadWorker(Thread):
        def __init__(self, group = None, target = None, name = None, args = (), kwargs = None, *,
                     daemon = None):
            Thread.__init__(self, group, target, name, args, kwargs, daemon = daemon)
            
            self._return = None
        
        def run(self):
            if self._target is not None:
                self._return = self._target(*self._args, **self._kwargs)
        
        def join(self):
            Thread.join(self)
            return self._return
    
    ntw = NewThreadWorker(target = com, args = (*arguments,))
    ntw.start()
    if Returning:
        return ntw.join()


if __name__ == "__main__":
    # threader(test('world'))
    print(NewThread(test, True, 'hi', 'test', test2('hi')))
    NewThread(test3, True)

Надеюсь, что это кому-то пригодится:)

Как уже упоминалось, многопроцессорный пул намного медленнее, чем базовые потоки. Использование очередей, предложенных в некоторых ответах, является очень эффективной альтернативой. Я использую его со словарями, чтобы иметь возможность запускать множество небольших потоков и восстанавливать множественные ответы, комбинируя их со словарями:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)

Ответ Киндалла в Python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon)
        self._return = None 

    def run(self):
        try:
            if self._target:
                self._return = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs 

    def join(self,timeout=None):
        Thread.join(self,timeout)
        return self._return

Если из вызова функции нужно проверить только True или False, я нашел бы более простое решение - обновить глобальный список.

import threading

lists = {"A":"True", "B":"True"}

def myfunc(name: str, mylist):
    for i in mylist:
        if i == 31:
            lists[name] = "False"
            return False
        else:
            print("name {} : {}".format(name, i))

t1 = threading.Thread(target=myfunc, args=("A", [1, 2, 3, 4, 5, 6], ))
t2 = threading.Thread(target=myfunc, args=("B", [11, 21, 31, 41, 51, 61], ))
t1.start()
t2.start()
t1.join()
t2.join()

for value in lists.values():
    if value == False:
        # Something is suspicious 
        # Take necessary action 

Это более полезно, если вы хотите узнать, вернул ли какой-либо из потоков ложное состояние, чтобы предпринять необходимые действия.

Я не знаю, сработало ли это для вас, ребята, но я решил создать глобальный объект (в основном словари или вложенные массивы), чтобы функция могла получить доступ к объекту и изменить его. Я знаю, что это требует больше ресурсов, но мы не Имея дело с квантовой наукой, я думаю, мы можем дать немного больше оперативной памяти при условии, что потребление оперативной памяти увеличивается линейно с загрузкой процессора. Вот пример кода:

      import requests 
import json 
import string 
import random 
import threading
import time 
dictionary = {} 
def get_val1(L): 
    print('#1')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val2(L): 
    print('#2')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val3(L): 
    print('#3')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val4(L): 
    print('#4')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
t1 = threading.Thread(target=get_val1,args=(L[0],)) 
t2 = threading.Thread(target=get_val2,args=(L[1],)) 
t3 = threading.Thread(target=get_val3,args=(L[2],))
t4 = threading.Thread(target=get_val4,args=(L[3],))

t1.start()
t2.start()
t3.start()
t4.start()

t1.join()
t2.join()
t3.join()
t4.join()

Эта программа выполняет 4 потока, каждый из которых возвращает некоторые данные для некоторого текста L[i] для i в L, возвращаемые данные из API сохраняются в словаре. Они могут варьироваться от программы к программе, выгодно это или нет, для небольших Для среднетяжелых вычислительных задач эта мутация объекта работает довольно быстро и использует на несколько процентов больше ресурсов.

Одно из обычных решений - обернуть вашу функцию foo с декоратором как

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

Тогда весь код может выглядеть так

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

Заметка

Одна важная проблема заключается в том, что возвращаемые значения могут быть неупорядоченными. (На самом деле, return value не обязательно сохраняется в queue, так как вы можете выбрать произвольную потокобезопасную структуру данных)

Очень простой способ сделать это для таких чайников, как я:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self):
        # in this class and function we will put our test target function
        test()

t = AnyThread()

# having our test target function
def test():
    # do something in this function:
    result = 3 + 2
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run()
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 5
>>> 5

Главное здесь - это queue модуль. Мы создаем queue.Queue() экземпляр и включить его в нашу функцию. Мы кормим его своим результатом, который позже мы выходим за пределы потока.

Посмотрите еще один пример с аргументами, переданными нашей тестовой функции:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self, a, b):
        # in this class and function we will put our execution test function
        test(a, b)

t = AnyThread()

# having our test target function
def test(a, b):
    # do something in this function:
    result = a + b
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run(3+i, 2+i)
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 7
>>> 9

Почему бы просто не использовать глобальную переменную?

import threading


class myThread(threading.Thread):
    def __init__(self, ind, lock):
        threading.Thread.__init__(self)
        self.ind = ind
        self.lock = lock

    def run(self):
        global results
        with self.lock:
            results.append(self.ind)



results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(results)

Я знаю, что эта ветка устарела.... но я столкнулся с той же проблемой... Если вы хотите использовать thread.join()

import threading

class test:

    def __init__(self):
        self.msg=""

    def hello(self,bar):
        print('hello {}'.format(bar))
        self.msg="foo"


    def main(self):
        thread = threading.Thread(target=self.hello, args=('world!',))
        thread.start()
        thread.join()
        print(self.msg)

g=test()
g.main()
Другие вопросы по тегам