Как преждевременно уничтожить потоки std::async до того, как они будут * завершены * без * использования std::atomic_bool?

У меня есть функция, которая принимает обратный вызов, и использовал его для работы на 10 отдельных потоков. Однако часто бывает так, что не вся работа необходима. Например, если желаемый результат получен в третьем потоке, он должен прекратить всю работу над оставшимися живыми потоками.

Этот ответ предполагает, что это невозможно, если у вас нет функций обратного вызова std::atomic_bool аргумент, который указывает, должна ли функция завершаться преждевременно.

Это решение не работает для меня. Рабочие раскручиваются внутри базового класса, и весь смысл этого базового класса состоит в том, чтобы абстрагировать детали многопоточности. Как я могу это сделать? Я ожидаю, что мне придется бросить std::async для чего-то более сложного.

#include <iostream>
#include <future>
#include <vector>

class ABC{
public:
    std::vector<std::future<int> > m_results;
    ABC() {};
    ~ABC(){};
    virtual int callback(int a) = 0;
    void doStuffWithCallBack();
};


void ABC::doStuffWithCallBack(){

    // start working
    for(int i = 0; i < 10; ++i)
        m_results.push_back(std::async(&ABC::callback, this, i));

    // analyze results and cancel all threads when you get the 1
    for(int j = 0; j < 10; ++j){

        double foo = m_results[j].get();

        if ( foo == 1){
            break;  // but threads continue running
        }

    }
    std::cout << m_results[9].get() << " <- this shouldn't have ever been computed\n";
}

class Derived : public ABC {
public:
    Derived() : ABC() {};
    ~Derived() {};
    int callback(int a){
        std::cout << a << "!\n";
        if (a == 3)
            return 1;
        else
            return 0;
    };
};

int main(int argc, char **argv)
{

    Derived myObj;
    myObj.doStuffWithCallBack();

    return 0;
}

1 ответ

Решение

Я просто скажу, что это, вероятно, не должно быть частью "нормальной" программы, поскольку это может привести к утечке ресурсов и / или оставить вашу программу в нестабильном состоянии, но в интересах науки...

Если у вас есть контроль над циклом потока, и вы не возражаете против использования функций платформы, вы можете добавить исключение в поток. С помощью posix вы можете использовать сигналы для этого, в Windows вам придется использовать SetThreadContext(). Хотя исключение, как правило, разматывает стек и вызывает деструкторы, ваш поток может находиться в системном вызове или в другом "безопасном месте, не исключающем исключение", когда возникает исключение.

Отказ от ответственности: на данный момент у меня только Linux, поэтому я не тестировал код Windows.

#if defined(_WIN32)
#   define ITS_WINDOWS
#else
#   define ITS_POSIX
#endif


#if defined(ITS_POSIX)
#include <signal.h>
#endif

void throw_exception() throw(std::string())
{
    throw std::string();
}

void init_exceptions()
{
    volatile int i = 0;
    if (i)
        throw_exception();
}

bool abort_thread(std::thread &t)
{

#if defined(ITS_WINDOWS)

    bool bSuccess = false;
    HANDLE h = t.native_handle();
    if (INVALID_HANDLE_VALUE == h)
        return false;

    if (INFINITE == SuspendThread(h))
        return false;

    CONTEXT ctx;
    ctx.ContextFlags = CONTEXT_CONTROL;
    if (GetThreadContext(h, &ctx))
    {
#if defined( _WIN64 )
        ctx.Rip = (DWORD)(DWORD_PTR)throw_exception;
#else
        ctx.Eip = (DWORD)(DWORD_PTR)throw_exception;
#endif

        bSuccess = SetThreadContext(h, &ctx) ? true : false;
    }

    ResumeThread(h);

    return bSuccess;

#elif defined(ITS_POSIX)

    pthread_kill(t.native_handle(), SIGUSR2);

#endif

    return false;
}


#if defined(ITS_POSIX)

void worker_thread_sig(int sig)
{
    if(SIGUSR2 == sig)
        throw std::string();
}

#endif

void init_threads()
{
#if defined(ITS_POSIX)

    struct sigaction sa;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sa.sa_handler = worker_thread_sig;
    sigaction(SIGUSR2, &sa, 0);

#endif
}

class tracker
{
public:
    tracker() { printf("tracker()\n"); }
    ~tracker() { printf("~tracker()\n"); }
};

int main(int argc, char *argv[])
{
    init_threads();

    printf("main: starting thread...\n");
    std::thread t([]()
    {
        try
        {
            tracker a;

            init_exceptions();

            printf("thread: started...\n");
            std::this_thread::sleep_for(std::chrono::minutes(1000));
            printf("thread: stopping...\n");
        }
        catch(std::string s)
        {
            printf("thread: exception caught...\n");
        }
    });

    printf("main: sleeping...\n");
    std::this_thread::sleep_for(std::chrono::seconds(2));

    printf("main: aborting...\n");
    abort_thread(t);

    printf("main: joining...\n");
    t.join();

    printf("main: exiting...\n");

    return 0;
}

Выход:

main: starting thread...
main: sleeping...
tracker()
thread: started...
main: aborting...
main: joining...
~tracker()
thread: exception caught...
main: exiting...
Другие вопросы по тегам