Как вы можете вызвать обратный вызов эмиттера из отдельного потока C++ в аддоне?

Для контекста я начал с этого вопроса. Мне нужно вызвать обратный вызов для эмиттера в другом потоке. Я сделал минимальный пример, но он emit.Call({cb, result}); Мой первый инстинкт заключается в том, что у меня есть проблемы с продолжительностью жизни env или emit функция.

addon.cpp

#include <napi.h>
#include <iostream>
#include <thread>
#include <memory>
#include <functional>
#include <chrono>

std::shared_ptr<std::thread> thread;
bool running = true;

void generate(Napi::Env& env, Napi::Function& emit)
{
  while(running)
  {
    Napi::Array result = Napi::Array::New(env);

    for(int i = 0; i < 3; ++i)
    {
      result[i] = rand()%100;
    }

    auto cb = Napi::String::New(env, "onFeedData");

    emit.Call({cb, result});

    std::this_thread::sleep_for(std::chrono::seconds(1));
  }
}

Napi::Value Start(const Napi::CallbackInfo& info)
{
  Napi::Env env = info.Env();
  Napi::Function emit = info[0].As<Napi::Function>();

  auto cb = std::bind(generate, env, emit);
  thread = std::make_shared<std::thread>(cb);

  return Napi::String::New(env, "OK");
}

Napi::Value Stop(const Napi::CallbackInfo& info)
{
  Napi::Env env = info.Env();
  Napi::Function emit = info[0].As<Napi::Function>();

  running = false;
  thread->join();

  return Napi::String::New(env, "OK");
}

Napi::Object Init(Napi::Env env, Napi::Object exports)
{
  exports.Set(
      Napi::String::New(env, "Start"),
      Napi::Function::New(env, Start));

  exports.Set(Napi::String::New(env, "Stop"),
      Napi::Function::New(env, Stop));

  return exports;
}

NODE_API_MODULE(addon, Init)

index.js

'use strict'

const EventEmitter = require('events').EventEmitter;
const addon = require('./build/addon.node');

function Main() {
  const emitter = new EventEmitter();

  emitter.on('onFeedData', (evt) => {
    console.log(evt);
  })

  setTimeout(() => {
    addon.Stop( emitter.emit.bind(emitter) );
  }, 5000);

  addon.Start( emitter.emit.bind(emitter) );
}

Main();

3 ответа

Решение

Я не пробовал этого, но я знаю, что в файле node.js v10.6 введены асинхронные поточно-ориентированные вызовы функций, он все еще находится в экспериментальном состоянии с уровнем стабильности 1. Использование также имеет определенные ограничения, вот фрагмент кода из Документация по node.js.

Функции JavaScript обычно могут вызываться только из основного потока нативного дополнения. Если аддон создает дополнительные потоки, функции N-API, для которых требуются napi_env, napi_value или napi_ref, не должны вызываться из этих потоков.

Когда у аддона есть дополнительные потоки, и функции JavaScript должны вызываться на основе обработки, выполняемой этими потоками, эти потоки должны взаимодействовать с основным потоком аддона, чтобы основной поток мог вызывать функцию JavaScript от их имени. API-интерфейсы с поддержкой многопоточных функций обеспечивают простой способ сделать это.

Вы можете получить полную документацию об этом. Асинхронный потокобезопасный вызов функций

Я пробовал много решений, но работает только это.

С помощью https://github.com/mika-fischer/napi-thread-safe-callback вы можете безопасно выполнять обратный вызов из подпотока:

void example_async_work(const CallbackInfo& info)
{
    // Capture callback in main thread
    auto callback = std::make_shared<ThreadSafeCallback>(info[0].As<Function>());
    bool fail = info.Length() > 1;

    // Pass callback to other thread
    std::thread([callback, fail]
    {
        try
        {
            // Do some work to get a result
            if (fail)
                throw std::runtime_error("Failure during async work");
            std::string result = "foo";

            // Call back with result
            callback->call([result](Napi::Env env, std::vector<napi_value>& args)
            {
                // This will run in main thread and needs to construct the
                // arguments for the call
                args = { env.Undefined(), Napi::String::New(env, result) };
            });
        }
        catch (std::exception& e)
        {
            // Call back with error
            callback->callError(e.what());
        }
    }).detach();
}

addon.cc

      #include <napi.h>
static Napi::FunctionReference emit;

Napi::Value EMIT_SET(const Napi::CallbackInfo& info) {
    Napi::Function fn = info[0].As<Napi::Function>();
    emit = Napi::Weak(fn);//Persistent(fn);
    emit.SuppressDestruct();    
    return info.Env().Null();
}

Napi::Value EMIT_CALL(const Napi::CallbackInfo& info) {
    std::string type = info[0].As<Napi::String>();
    uint        arg1 = info[1].As<Number>().Uint32Value();
    std::string arg2 = info[2].As<Napi::String>();
    std::thread([](std::string type, uint arg1,std::string arg2, Napi::ThreadSafeFunction tsfn){
            struct output_data{
                std::string type;
                uint arg1;
                std::string arg2;
            };          
            auto data = new output_data();
            ///---------------
            ///fill output data
            data->type=type;
            data->arg1=arg1;
            data->arg2=arg2;
            std::this_thread::sleep_for(std::chrono::milliseconds(2000));
            ///---------------
            ///output thread result to nodejs
            napi_status status = tsfn.BlockingCall(data,[](Napi::Env env, Napi::Function emit,output_data* data){
                if(data->type=="data 1") emit.Call({Napi::String::New(env, "data 1"), Napi::Number::New(env, 111111), Napi::String::New(env, "to data 1")});
                if(data->type=="data 2") emit.Call({Napi::String::New(env, "data 2"), Napi::Number::New(env, 222222), Napi::String::New(env, "to data 2")});
                delete data;
            });

            if(status != napi_ok) { std::cout << "error!" << "\n"; }
            tsfn.Release();         
        },type,arg1,arg2,Napi::ThreadSafeFunction::New(info.Env(), emit.Value(), "TSFN", 0, 1,[](Napi::Env env, void *finalizeData){},(void *)nullptr)).detach();

    return info.Env().Null();
}

index.js

      const ADDON = require('./EMIT/build/Release/addon');
const emitter = new(require('events'))
ADDON.EMIT_SET(emitter.emit.bind(emitter));

emitter.on('data 1',(arg1,arg2)=>{
    console.log('data1: ',arg1);
    console.log('data1: ',arg2);
})

emitter.on('data 2',(arg1,arg2)=>{
    console.log('data2: ',arg1);
    console.log('data2: ',arg2);
})

ADDON.EMIT_CALL('data 1',111,"aaa");
ADDON.EMIT_CALL('data 2',222,"bbb");

console.log('fin')

выход:

  • плавник
  • данные2: 222222
  • данные2: к данным 2
  • данные1: 111111
  • данные1: к данным 1

см. также, как вернуть данные из потока

Другие вопросы по тегам