Узел FFI Callback из потока C++

Я столкнулся с неприятной проблемой сегодня. Я работаю с node-ffi запустить код C++ в моем электронном приложении. В целом у меня был хороший опыт, но сегодня я начал работать с многопоточностью и столкнулся с некоторыми трудностями. ffi обратный вызов, который я передаю, вызывается из потока просто отлично. Однако, когда я заканчиваю свой цикл и пытаюсь join петля нити к основной нити полностью замораживает электронное приложение.

Полный отказ от ответственности: я довольно новичок в C++ и буду признателен за любые отзывы о моем коде, чтобы улучшить его, особенно за любые красные флажки, о которых вы думаете, я должен знать.

Вот два репозитория, которые демонстрируют ошибку, с которой я столкнулся:
Электронный проект - https://github.com/JakeDluhy/threading-test
C++ DLL - https://github.com/JakeDluhy/ThreadedDll

И вот обзор того, что я делаю:
В моей dll я выставляю функции для начала / окончания сеанса и запуска / остановки потоковой передачи. Они вызывают ссылку на экземпляр класса для фактической реализации функциональности. По сути, это оболочка C вокруг более мощного класса C++.

// ThreadedDll.h
#pragma once

#ifdef __cplusplus
extern "C" {
#endif

#ifdef THREADEDDLL_EXPORTS
#define THREADEDDLL_API __declspec(dllexport)
#else
#define THREADEDDLL_API __declspec(dllimport)
#endif
    THREADEDDLL_API void beginSession(void(*frameReadyCB)());
    THREADEDDLL_API void endSession();

    THREADEDDLL_API void startStreaming();
    THREADEDDLL_API void stopStreaming();
#ifdef __cplusplus
}
#endif

// ThreadedDll.cpp
#include "ThreadedDll.h"
#include "Threader.h"

static Threader *threader = NULL;

void beginSession(void(*frameReadyCB)())
{
    threader = new Threader(frameReadyCB);
}

void endSession()
{
    delete threader;
    threader = NULL;
}

void startStreaming()
{
    if (threader) threader->start();
}

void stopStreaming()
{
    if (threader) threader->stop();
}

Вот что Threader класс выглядит так:

// Threader.h
#pragma once

#include <thread>
#include <atomic>

using std::thread;
using std::atomic;

class Threader
{
public:
    Threader(void(*frameReadyCB)());
    ~Threader();

    void start();
    void stop();
private:
    void renderLoop();

    atomic<bool> isThreading;
    void(*frameReadyCB)();
    thread myThread;
};

// Threader.cpp
#include "Threader.h"

Threader::Threader(void(*frameReadyCB)()) :
    isThreading{ false },
    frameReadyCB{ frameReadyCB }
{
}


Threader::~Threader()
{
    if (myThread.joinable()) myThread.join();
}

void Threader::start()
{
    isThreading = true;

    myThread = thread(&Threader::renderLoop, this);
}

void Threader::stop()
{
    isThreading = false;

    if (myThread.joinable()) myThread.join();
}

void Threader::renderLoop()
{
    while (isThreading) {
        frameReadyCB();
    }
}

А потом вот мой тестовый JavaScript, который использует его:

// ThreadedDll.js
const ffi = require('ffi');
const path = require('path');

const DllPath = path.resolve(__dirname, '../dll/ThreadedDll.dll');
// Map the library functions in the way that FFI expects
const DllMap = {
    'beginSession':     [ 'void', [ 'pointer' ] ],
    'endSession':       [ 'void', [] ],

    'startStreaming':   [ 'void', [] ],
    'stopStreaming':    [ 'void', [] ],
};
// Create the Library using ffi, the DLL, and the Function Table
const DllLib = ffi.Library(DllPath, DllMap);

class ThreadedDll {
    constructor(args) {
        this.frameReadyCB = ffi.Callback('void', [], () => {
            console.log('Frame Ready');
        });

        DllLib.beginSession(this.frameReadyCB);
    }

    startStreaming() {
        DllLib.startStreaming();
    }

    stopStreaming() {
        DllLib.stopStreaming();
    }

    endSession() {
        DllLib.endSession();
    }
}

module.exports = ThreadedDll;

// app.js
const ThreadedDll = require('./ThreadedDll');

setTimeout(() => {
    const threaded = new ThreadedDll();
    console.log('start stream');
    threaded.startStreaming();

    setTimeout(() => {
        console.log('stop stream');
        threaded.stopStreaming();
        console.log('end session');
        threaded.endSession();
    }, 1000);
}, 2000);

И именно в app.js запускается основной электронный процесс. Я бы ожидал увидеть

start stream
Frame Ready (3800)
stop stream
end session

Но это показывает, нет end session, Однако, если я уберу строку frameReadyCB() в С ++ это работает как положено. Так что каким-то образом ссылка на обратный вызов ffi портит многопоточную среду. Хотел бы получить некоторые мысли по этому поводу. Спасибо!

1 ответ

проблема

Ваше приложение заблокировано. В вашем примере у вас есть два потока:

  1. thread-1 - создается при запуске $ npm start, а также
  2. нить-2 - создана в Threader::start(),

В теме-2 звоните frameReadyCB(), который собирается заблокировать поток, пока он не завершится. Предыдущий ответ показывает, что обратный вызов будет выполнен в потоке-1.

К сожалению, поток-1 уже занят вторым setTimeout, вызывая stopStreaming(), Threader::stop пытается присоединиться к потоку-2, блокируя, пока поток-2 не закончил.

Теперь вы зашли в тупик. Поток-2 ожидает, пока поток-1 выполнит обратный вызов, а поток-1 ожидает, пока поток-2 завершит выполнение. Они оба ждут друг друга.

Решение через node-ffi

Кажется, node-ffi обрабатывает обратные вызовы, выполняющиеся в отдельном потоке, когда поток создается через node-ffi, используя async(), Таким образом, вы можете удалить потоки из вашей библиотеки C++ и вместо этого вызвать DllLib.startStreaming.async(() => {}) из вашей библиотеки узлов.

Решение через C++

Чтобы решить эту проблему, вы должны убедиться, что никогда не пытаетесь присоединиться к потоку 2, пока он ожидает frameReadyCB() завершить. Вы можете сделать это, используя мьютекс. Кроме того, вы должны убедиться, что не ожидаете блокировки мьютекса, пока поток 2 ожидает frameReadyCB(), Единственный способ сделать это - создать еще один поток, чтобы остановить потоковую передачу. Пример ниже делает это с помощью node-ffi async, хотя это можно сделать в библиотеке C++, чтобы скрыть это от вашей библиотеки узлов.

// Threader.h
#pragma once

#include <thread>
#include <atomic>

using std::thread;
using std::atomic;
using std::mutex;

class Threader
{
public:
    Threader(void(*frameReadyCB)());
    ~Threader();

    void start();
    void stop();
private:
    void renderLoop();

    atomic<bool> isThreading;
    void(*frameReadyCB)();
    thread myThread;
    mutex mtx;
};
// Threader.cpp
#include "Threader.h"

Threader::Threader(void(*frameReadyCB)()) :
    isThreading{ false },
    frameReadyCB{ frameReadyCB }
{
}


Threader::~Threader()
{
    stop();
}

void Threader::start()
{
    isThreading = true;

    myThread = thread(&Threader::renderLoop, this);
}

void Threader::stop()
{
    isThreading = false;

    mtx.lock();
    if (myThread.joinable()) myThread.join();
    mtx.unlock();
}

void Threader::renderLoop()
{
    while (isThreading) {
        mtx.lock();
        frameReadyCB();
        mtx.unlock();
    }
}
// ThreadedDll.js
const ffi = require('ffi');
const path = require('path');

const DllPath = path.resolve(__dirname, '../dll/ThreadedDll.dll');
// Map the library functions in the way that FFI expects
const DllMap = {
    'beginSession':     [ 'void', [ 'pointer' ] ],
    'endSession':       [ 'void', [] ],

    'startStreaming':   [ 'void', [] ],
    'stopStreaming':    [ 'void', [] ],
};
// Create the Library using ffi, the DLL, and the Function Table
const DllLib = ffi.Library(DllPath, DllMap);

class ThreadedDll {
    constructor(args) {
        this.frameReadyCB = ffi.Callback('void', [], () => {
            console.log('Frame Ready');
        });

        DllLib.beginSession(this.frameReadyCB);
    }

    startStreaming() {
        DllLib.startStreaming();
    }

    stopStreaming() {
        DllLib.stopStreaming.async(() => {});
    }

    endSession() {
        DllLib.endSession.async(() => {});
    }
}

module.exports = ThreadedDll;
Другие вопросы по тегам