Поймать исключение потока в потоке вызывающего в Python
Я очень плохо знаком с Python и многопоточным программированием в целом. По сути, у меня есть скрипт, который будет копировать файлы в другое место. Я хотел бы, чтобы это было помещено в другой поток, чтобы я мог выводить ....
чтобы указать, что скрипт все еще работает.
Проблема, с которой я столкнулся, заключается в том, что если файлы не могут быть скопированы, это вызовет исключение. Это нормально, если работает в главном потоке; однако следующий код не работает:
try:
threadClass = TheThread(param1, param2, etc.)
threadClass.start() ##### **Exception takes place here**
except:
print "Caught an exception"
В самом классе потока я попытался повторно сгенерировать исключение, но оно не работает. Я видел, как люди здесь задают похожие вопросы, но все они, кажется, делают что-то более конкретное, чем то, что я пытаюсь сделать (и я не совсем понимаю предлагаемые решения). Я видел, как люди упоминают об использовании sys.exc_info()
Однако я не знаю, где и как его использовать.
Вся помощь очень ценится!
РЕДАКТИРОВАТЬ: код для класса потока ниже:
class TheThread(threading.Thread):
def __init__(self, sourceFolder, destFolder):
threading.Thread.__init__(self)
self.sourceFolder = sourceFolder
self.destFolder = destFolder
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except:
raise
21 ответ
Проблема в том, что thread_obj.start()
возвращается немедленно. Созданный вами дочерний поток выполняется в своем собственном контексте со своим собственным стеком. Любое исключение происходит в контексте дочернего потока и находится в его собственном стеке. Прямо сейчас я могу думать о том, как передать эту информацию родительскому потоку, используя своего рода передачу сообщений, так что вы можете посмотреть на это.
Попробуйте это для размера:
import sys
import threading
import Queue
class ExcThread(threading.Thread):
def __init__(self, bucket):
threading.Thread.__init__(self)
self.bucket = bucket
def run(self):
try:
raise Exception('An error occured here.')
except Exception:
self.bucket.put(sys.exc_info())
def main():
bucket = Queue.Queue()
thread_obj = ExcThread(bucket)
thread_obj.start()
while True:
try:
exc = bucket.get(block=False)
except Queue.Empty:
pass
else:
exc_type, exc_obj, exc_trace = exc
# deal with the exception
print exc_type, exc_obj
print exc_trace
thread_obj.join(0.1)
if thread_obj.isAlive():
continue
else:
break
if __name__ == '__main__':
main()
На этот вопрос есть много действительно сложных ответов. Я упрощаю это, потому что мне кажется, что этого достаточно для большинства вещей.
from threading import Thread
class PropagatingThread(Thread):
def run(self):
self.exc = None
try:
if hasattr(self, '_Thread__target'):
# Thread uses name mangling prior to Python 3.
self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
else:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exc = e
def join(self):
super(PropagatingThread, self).join()
if self.exc:
raise self.exc
return self.ret
Если вы уверены, что когда-либо будете работать только на одной или другой версии Python, вы можете уменьшить run()
метод вплоть до только искаженной версии (если вы будете работать только на версиях Python до 3), или только чистой версии (если вы будете работать только на версиях Python, начинающихся с 3).
Пример использования:
def f(*args, **kwargs)
print(args)
print(kwargs)
raise Exception('I suck')
t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()
И вы увидите исключение, возникшее в другом потоке, когда вы присоединитесь.
concurrent.futures
Модуль упрощает работу в отдельных потоках (или процессах) и обрабатывает любые возникающие исключения:
import concurrent.futures
import shutil
def copytree_with_dots(src_path, dst_path):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# Execute the copy on a separate thread,
# creating a future object to track progress.
future = executor.submit(shutil.copytree, src_path, dst_path)
while future.running():
# Print pretty dots here.
pass
# Return the value returned by shutil.copytree(), None.
# Raise any exceptions raised during the copy process.
return future.result()
concurrent.futures
включен в Python 3.2, и доступен в качестве резервного futures
модуль для более ранних версий.
Хотя невозможно напрямую перехватить исключение, созданное в другом потоке, вот код, который позволяет совершенно прозрачно получить что-то очень близкое к этой функциональности. Ваш дочерний поток должен наследовать ExThread
класс вместо threading.Thread
и родительский поток должен вызвать child_thread.join_with_exception()
метод вместо child_thread.join()
в ожидании потока, чтобы закончить свою работу.
Технические подробности этой реализации: когда дочерний поток генерирует исключение, он передается родителю через Queue
и снова выбрасывается в родительский поток. Обратите внимание, что в этом подходе нет ожидания.
#!/usr/bin/env python
import sys
import threading
import Queue
class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()
def run_with_exception(self):
"""This method should be overriden."""
raise NotImplementedError
def run(self):
"""This method should NOT be overriden."""
try:
self.run_with_exception()
except BaseException:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)
def wait_for_exc_info(self):
return self.__status_queue.get()
def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]
class MyException(Exception):
pass
class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)
def run_with_exception(self):
thread_name = threading.current_thread().name
raise MyException("An error in thread '{}'.".format(thread_name))
def main():
t = MyThread()
t.start()
try:
t.join_with_exception()
except MyException as ex:
thread_name = threading.current_thread().name
print "Caught a MyException in thread '{}': {}".format(thread_name, ex)
if __name__ == '__main__':
main()
Если в потоке возникает исключение, лучший способ - повторно вызвать его в потоке вызывающего во время join
, Вы можете получить информацию об исключении, которое обрабатывается в данный момент, используя sys.exc_info()
функция. Эта информация может быть просто сохранена как свойство объекта потока до join
называется, в этот момент он может быть повторно поднят.
Обратите внимание, что Queue.Queue
(как предлагается в других ответах) не является необходимым в этом простом случае, когда поток выбрасывает не более 1 исключения и завершается сразу после создания исключения. Мы избегаем условий гонки, просто ожидая завершения потока.
Например, расширить ExcThread
(ниже), переопределение excRun
(вместо run
).
Python 2.x:
import threading
class ExcThread(threading.Thread):
def excRun(self):
pass
def run(self):
self.exc = None
try:
# Possibly throws an exception
self.excRun()
except:
import sys
self.exc = sys.exc_info()
# Save details of the exception thrown but don't rethrow,
# just complete the function
def join(self):
threading.Thread.join(self)
if self.exc:
msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
new_exc = Exception(msg)
raise new_exc.__class__, new_exc, self.exc[2]
Python 3.x:
Форма 3 аргумента для raise
отсутствует в Python 3, поэтому измените последнюю строку на:
raise new_exc.with_traceback(self.exc[2])
concurrent.futures.as_completed
https://docs.python.org/3.7/library/concurrent.futures.html
Следующее решение:
- немедленно возвращается в основной поток при вызове исключения
- не требует дополнительных пользовательских классов, потому что это не нужно:
- явный
Queue
- добавить что-то еще, кроме вашего рабочего потока
- явный
Источник:
#!/usr/bin/env python3
import concurrent.futures
import time
def func_that_raises(do_raise):
for i in range(3):
print(i)
time.sleep(0.1)
if do_raise:
raise Exception()
for i in range(3):
print(i)
time.sleep(0.1)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = []
futures.append(executor.submit(func_that_raises, False))
futures.append(executor.submit(func_that_raises, True))
for future in concurrent.futures.as_completed(futures):
print(repr(future.exception()))
Возможный вывод:
0
0
1
1
2
2
0
Exception()
1
2
None
К сожалению, невозможно уничтожить фьючерсы, чтобы отменить другие, так как один не получается:
concurrent.features
; Python: concurrent.futures Как сделать его отменяемым?threading
: Есть ли способ убить поток?- Cththreads: Kill Thread в библиотеке Pthread
Если вы делаете что-то вроде:
for future in concurrent.futures.as_completed(futures):
if future.exception() is not None:
raise future.exception()
тогда with
ловит его и ждет окончания второго потока, прежде чем продолжить. Следующее ведет себя аналогично:
for future in concurrent.futures.as_completed(futures):
future.result()
поскольку future.result()
повторно вызывает исключение, если оно произошло.
Если вы хотите выйти из всего процесса Python, вы можете сойти с рук os._exit(0)
, но это, вероятно, означает, что вам нужен рефакторинг.
Протестировано на Python 3.6.7, Ubuntu 18.04.
Что я делаю, это простой переопределяющий метод соединения и запуска потока:
class RaisingThread(threading.Thread):
def run(self):
self._exc = None
try:
super().run()
except Exception as e:
self._exc = e
def join(self):
super().join()
if self._exc:
raise self._exc
Используется следующим образом:
def foo():
time.sleep(2)
print('hi, from foo!')
raise Exception('exception from foo')
t = RaisingThread(target=foo)
t.start()
try:
t.join()
except Exception as e:
print(e)
Результат:
hi, from foo!
exception from foo!
В Python 3.8 мы можем использовать threading.excepthook для перехвата неперехваченных исключений во всех дочерних потоках! Например,
threading.excepthook = thread_exception_handler
Референт: /questions/36031886/sysexcepthook-i-mnogopotochnost/55398595#55398595
Это была неприятная небольшая проблема, и я хотел бы добавить свое решение. Некоторые другие решения, которые я нашел (например, async.io), выглядели многообещающе, но также представляли собой черную коробку. Подход цикла очереди / события как бы связывает вас с определенной реализацией. Параллельный исходный код фьючерса, однако, составляет всего около 1000 строк, и его легко понять. Это позволило мне легко решить мою проблему: создавать рабочие потоки ad-hoc без особых настроек и иметь возможность перехватывать исключения в основном потоке.
Мое решение использует API параллельных фьючерсов и API потоков. Это позволяет вам создать работника, который дает вам как поток, так и будущее. Таким образом, вы можете присоединиться к потоку, чтобы дождаться результата:
worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())
... или вы можете позволить работнику просто отправить обратный вызов, когда закончите:
worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))
... или вы можете выполнить цикл до завершения события:
worker = Worker(test)
thread = worker.start()
while True:
print("waiting")
if worker.future.done():
exc = worker.future.exception()
print('exception?', exc)
result = worker.future.result()
print('result', result)
break
time.sleep(0.25)
Вот код:
from concurrent.futures import Future
import threading
import time
class Worker(object):
def __init__(self, fn, args=()):
self.future = Future()
self._fn = fn
self._args = args
def start(self, cb=None):
self._cb = cb
self.future.set_running_or_notify_cancel()
thread = threading.Thread(target=self.run, args=())
thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
thread.start()
return thread
def run(self):
try:
self.future.set_result(self._fn(*self._args))
except BaseException as e:
self.future.set_exception(e)
if(self._cb):
self._cb(self.future.result())
... и тестовая функция:
def test(*args):
print('args are', args)
time.sleep(2)
raise Exception('foo')
Я знаю, что немного опоздал на вечеринку здесь, но у меня была очень похожая проблема, но она включала использование tkinter в качестве графического интерфейса, и основной цикл лишил возможности использовать любое из решений, которые зависят от.join(). Поэтому я адаптировал решение, данное в РЕДАКТИРЕ исходного вопроса, но сделал его более общим, чтобы его было легче понять другим.
Вот новый класс потока в действии:
import threading
import traceback
import logging
class ExceptionThread(threading.Thread):
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
except Exception:
logging.error(traceback.format_exc())
def test_function_1(input):
raise IndexError(input)
if __name__ == "__main__":
input = 'useful'
t1 = ExceptionThread(target=test_function_1, args=[input])
t1.start()
Конечно, вы всегда можете обработать исключение другим способом, например, распечатать его или вывести на консоль.
Это позволяет вам использовать класс ExceptionThread точно так же, как и класс Thread, без каких-либо специальных изменений.
Я использую эту версию, она минимальная и работает хорошо.
class SafeThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(SafeThread, self).__init__(*args, **kwargs)
self.exception = None
def run(self) -> None:
try:
super(SafeThread, self).run()
except Exception as ex:
self.exception = ex
traceback.print_exc()
def join(self, *args, **kwargs) -> None:
super(SafeThread, self).join(*args, **kwargs)
if self.exception:
raise self.exception
Чтобы использовать его, просто замените
threading.Thread
с участием
SafeThread
например
t = SafeThread(target = some_function, args = (some, args,))
t.start()
# do something else here if you want as the thread runs in the background
t.join()
Подобным образом, как у RickardSjogren без Queue, sys и т. Д., Но также и без некоторых прослушивателей сигналов: выполняйте непосредственно обработчик исключений, который соответствует блоку исключения.
#!/usr/bin/env python3
import threading
class ExceptionThread(threading.Thread):
def __init__(self, callback=None, *args, **kwargs):
"""
Redirect exceptions of thread to an exception handler.
:param callback: function to handle occured exception
:type callback: function(thread, exception)
:param args: arguments for threading.Thread()
:type args: tuple
:param kwargs: keyword arguments for threading.Thread()
:type kwargs: dict
"""
self._callback = callback
super().__init__(*args, **kwargs)
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
except BaseException as e:
if self._callback is None:
raise e
else:
self._callback(self, e)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs, self._callback
Только self._callback и блок исключений в run() являются дополнительными к обычным потокам.
Как новичок в Threading, мне потребовалось много времени, чтобы понять, как реализовать код Матеуша Кобоса (см. Выше). Вот уточненная версия, чтобы помочь понять, как ее использовать.
#!/usr/bin/env python
import sys
import threading
import Queue
class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()
def run_with_exception(self):
"""This method should be overriden."""
raise NotImplementedError
def run(self):
"""This method should NOT be overriden."""
try:
self.run_with_exception()
except Exception:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)
def wait_for_exc_info(self):
return self.__status_queue.get()
def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]
class MyException(Exception):
pass
class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)
# This overrides the "run_with_exception" from class "ExThread"
# Note, this is where the actual thread to be run lives. The thread
# to be run could also call a method or be passed in as an object
def run_with_exception(self):
# Code will function until the int
print "sleeping 5 seconds"
import time
for i in 1, 2, 3, 4, 5:
print i
time.sleep(1)
# Thread should break here
int("str")
# I'm honestly not sure why these appear here? So, I removed them.
# Perhaps Mateusz can clarify?
# thread_name = threading.current_thread().name
# raise MyException("An error in thread '{}'.".format(thread_name))
if __name__ == '__main__':
# The code lives in MyThread in this example. So creating the MyThread
# object set the code to be run (but does not start it yet)
t = MyThread()
# This actually starts the thread
t.start()
print
print ("Notice 't.start()' is considered to have completed, although"
" the countdown continues in its new thread. So you code "
"can tinue into new processing.")
# Now that the thread is running, the join allows for monitoring of it
try:
t.join_with_exception()
# should be able to be replace "Exception" with specific error (untested)
except Exception, e:
print
print "Exceptioon was caught and control passed back to the main thread"
print "Do some handling here...or raise a custom exception "
thread_name = threading.current_thread().name
e = ("Caught a MyException in thread: '" +
str(thread_name) +
"' [" + str(e) + "]")
raise Exception(e) # Or custom class of exception, such as MyException
Wrap Thread с хранилищем исключений.
import threading
import sys
class ExcThread(threading.Thread):
def __init__(self, target, args = None):
self.args = args if args else []
self.target = target
self.exc = None
threading.Thread.__init__(self)
def run(self):
try:
self.target(*self.args)
raise Exception('An error occured here.')
except Exception:
self.exc=sys.exc_info()
def main():
def hello(name):
print(!"Hello, {name}!")
thread_obj = ExcThread(target=hello, args=("Jack"))
thread_obj.start()
thread_obj.join()
exc = thread_obj.exc
if exc:
exc_type, exc_obj, exc_trace = exc
print(exc_type, ':',exc_obj, ":", exc_trace)
main()
Использование обнаженных исключений не является хорошей практикой, потому что вы обычно ловите больше, чем ожидаете.
Я бы предложил изменить except
поймать ТОЛЬКО исключение, которое вы хотели бы обработать. Я не думаю, что повышение это имеет желаемый эффект, потому что, когда вы идете, чтобы создать экземпляр TheThread
во внешнем try
Если это вызывает исключение, назначение никогда не произойдет.
Вместо этого вы можете просто предупредить об этом и двигаться дальше, например:
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except OSError, err:
print err
Затем, когда это исключение будет обнаружено, вы можете обработать его там. Затем, когда внешний try
ловит исключение из TheThread
вы знаете, что это будет не тот, с которым вы уже работали, и он поможет вам изолировать процесс.
Мне нравится этот класс:
https://gist.github.com/earonesty/b88d60cb256b71443e42c4f1d949163e
import threading
from typing import Any
class PropagatingThread(threading.Thread):
"""A Threading Class that raises errors it caught, and returns the return value of the target on join."""
def __init__(self, *args, **kwargs):
self._target = None
self._args = ()
self._kwargs = {}
super().__init__(*args, **kwargs)
self.exception = None
self.return_value = None
assert self._target
def run(self):
"""Don't override this if you want the behavior of this class, use target instead."""
try:
if self._target:
self.return_value = self._target(*self._args, **self._kwargs)
except Exception as e:
self.exception = e
finally:
# see super().run() for why this is necessary
del self._target, self._args, self._kwargs
def join(self, timeout=None) -> Any:
super().join(timeout)
if self.exception:
raise self.exception
return self.return_value
Простой способ перехвата исключения потока и обратной связи с методом вызывающего может быть путем передачи словаря или списка worker
метод.
Пример (передача словаря в рабочий метод):
import threading
def my_method(throw_me):
raise Exception(throw_me)
def worker(shared_obj, *args, **kwargs):
try:
shared_obj['target'](*args, **kwargs)
except Exception as err:
shared_obj['err'] = err
shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"
th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()
if shared_obj['err']:
print(">>%s" % shared_obj['err'])
Один метод, который мне нравится, основан на модели наблюдателя. Я определяю класс сигнала, который мой поток использует для выдачи исключений слушателям. Он также может быть использован для возврата значений из потоков. Пример:
import threading
class Signal:
def __init__(self):
self._subscribers = list()
def emit(self, *args, **kwargs):
for func in self._subscribers:
func(*args, **kwargs)
def connect(self, func):
self._subscribers.append(func)
def disconnect(self, func):
try:
self._subscribers.remove(func)
except ValueError:
raise ValueError('Function {0} not removed from {1}'.format(func, self))
class WorkerThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.Exception = Signal()
self.Result = Signal()
def run(self):
if self._Thread__target is not None:
try:
self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
except Exception as e:
self.Exception.emit(e)
else:
self.Result.emit(self._return_value)
if __name__ == '__main__':
import time
def handle_exception(exc):
print exc.message
def handle_result(res):
print res
def a():
time.sleep(1)
raise IOError('a failed')
def b():
time.sleep(2)
return 'b returns'
t = WorkerThread(target=a)
t2 = WorkerThread(target=b)
t.Exception.connect(handle_exception)
t2.Result.connect(handle_result)
t.start()
t2.start()
print 'Threads started'
t.join()
t2.join()
print 'Done'
У меня недостаточно опыта работы с потоками, чтобы утверждать, что это абсолютно безопасный метод. Но это сработало для меня, и мне нравится гибкость.
Я думаю, что другие решения несколько сложны, если единственное, что вы хотите, - это действительно увидеть где-то исключение, вместо того, чтобы быть невнимательным и полностью слепым.
Решение состоит в том, чтобы создать собственный
Thread
который берет регистратор из основного потока и регистрирует любые исключения.
class ThreadWithLoggedException(threading.Thread):
"""
Similar to Thread but will log exceptions to passed logger.
Args:
logger: Logger instance used to log any exception in child thread
Exception is also reachable via <thread>.exception from the main thread.
"""
def __init__(self, *args, **kwargs):
try:
self.logger = kwargs.pop("logger")
except KeyError:
raise Exception("Missing 'logger' in kwargs")
super().__init__(*args, **kwargs)
self.exception = None
def run(self):
try:
if self._target is not None:
self._target(*self._args, **self._kwargs)
except Exception as exception:
thread = threading.current_thread()
self.exception = exception
self.logger.exception(f"Exception in child thread {thread}: {exception}")
finally:
del self._target, self._args, self._kwargs
Пример:
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
def serve():
raise Exception("Earth exploded.")
th = ThreadWithLoggedException(target=serve, logger=logger)
th.start()
Вывод в основном потоке:
Exception in child thread <ThreadWithLoggedException(Thread-1, started 139922384414464)>: Earth exploded.
Traceback (most recent call last):
File "/core/utils.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/myapp.py", line 105, in serve
raise Exception("Earth exploded.")
Exception: Earth exploded.
оберните функцию, которую вы передаете в поток, пробным замыканием:
def exception_printing(f):
def f_(*args):
try:
return f(*args)
except Exception as e:
print(e)
return None
return f_
pygolang предоставляет sync.WorkGroup, который, в частности, распространяет исключение из порожденных рабочих потоков в основной поток. Например:
#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""
from __future__ import print_function
from golang import sync, context
def T1(ctx, *argv):
print('T1: run ... %r' % (argv,))
raise RuntimeError('T1: problem')
def T2(ctx):
print('T2: ran ok')
def main():
wg = sync.WorkGroup(context.background())
wg.go(T1, [1,2,3])
wg.go(T2)
try:
wg.wait()
except Exception as e:
print('Tmain: caught exception: %r\n' %e)
# reraising to see full traceback
raise
if __name__ == '__main__':
main()
при запуске дает следующее:
T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)
Traceback (most recent call last):
File "./x.py", line 28, in <module>
main()
File "./x.py", line 21, in main
wg.wait()
File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
pyerr_reraise(pyerr)
File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
f(pywg._pyctx, *argv, **kw)
File "./x.py", line 10, in T1
raise RuntimeError('T1: problem')
RuntimeError: T1: problem
Исходный код из вопроса будет просто:
wg = sync.WorkGroup(context.background())
def _(ctx):
shul.copytree(sourceFolder, destFolder)
wg.go(_)
# waits for spawned worker to complete and, on error, reraises
# its exception on the main thread.
wg.wait()