Как использовать результаты разных изолятов в основном изоляте?

Я действительно новичок в Дарт, а также в программировании. Я пытаюсь разработать программу командной строки в Dart, используя изоляты. Я хочу сравнить ее производительность с той же программой, но написанной на Java с потоками.

Программа Dart выглядит следующим образом:

  1. main.dart

    import "dart:async";
    import "dart:isolate";
    
    main() {    
        var rPort1 = new ReceivePort();
        var rPort2 = new ReceivePort();
    
        var p1 = 0;
        rPort1.listen((partial) {
            print("p1 ${partial}");
            p1 = partial;
            rPort1.close();
        });
        var p2 = 0;
        rPort2.listen((partial) {
           print("p2 ${partial}");
           p2 = partial;
           rPort2.close();
        });
    
        Isolate.spawnUri(new Uri.file("MyIsolate.dart"), [arg0, ...], rPort1.sendPort);
        Isolate.spawnUri(new Uri.file("MyIsolate.dart"), [arg0, ...], rPort2.sendPort);
    
        var p3 = p1 + p2;
        print("p3 ${p3}");
    }
    
  2. myIsolate.dart

    import "dart:async";
    import "dart:isolate";
    
    main(args, SendPort port) {
        var partial = 0;
        // ... do stuff ...
        // args are used and partial is updated
        port.send(partial);
    }
    

Вывод выглядит так:

p3 0
p1 -0.1168096561671553
p2 0.023709338284264223

Как вы можете видеть, возвращаемые значения каждого изолята приходят после того, как основной изолят завершил свое выполнение. То, что я хочу, это использовать результат изолятов для дальнейшего расчета в основном.

Я не знаю, что мне не хватает. Я уверен, что это что-то очень глупое, но я не могу двигаться дальше по этой проблеме. В Java просто получить значение результата каждого потока, но в Dart я не могу понять, как это сделать изолированно.

Есть идеи?

2 ответа

Вы должны ждать, пока все потоки (из ваших портов) не будут завершены. Один из способов сделать это что-то вроде этого:

import "dart:async";
import "dart:isolate";

main() {
  var rPort1 = new ReceivePort();
  var rPort2 = new ReceivePort();

  // Defining completers which would complete when Streams are finished   
  Completer c1 = new Completer();
  Completer c2 = new Completer();

  var p1 = 0;
  rPort1.listen((partial) {
    print("p1 ${partial}");
    p1 = partial;
    rPort1.close();
  }, onDone: ()=>c1.complete()); // Notice onDone callback here
  var p2 = 0;
  rPort2.listen((partial) {
    print("p2 ${partial}");
    p2 = partial;
    rPort2.close();

  }, onDone: ()=>c2.complete()); // And here

  Isolate.spawnUri(new Uri.file("my_isolate.dart"), [0], rPort1.sendPort);
  Isolate.spawnUri(new Uri.file("my_isolate.dart"), [0], rPort2.sendPort);

  // Waiting for both streams to complete before summing our results
  Future.wait([c1.future,c2.future]).then((_){
    var p3 = p1 + p2;
    print("p3 ${p3}");
  });
}

Для вашей задачи, если вы ждете точных значений, вы можете определить только Futures для этих значений, которые вам нужны, и завершить их, не дожидаясь окончания ваших изолятов (и их потоков).

Для этого просто двигаться c*.complete(<value>) на соответствующий обратный вызов listen(). Нечто подобное (не проверено):

rPort1.listen((partial) {
  print("p1 ${partial}");
  c1.complete(partial);
  rPort1.close();
});
rPort2.listen((partial) {
  print("p2 ${partial}");
  c2.complete(partial);
  rPort2.close();
});

...

Future.wait([c1.future,c2.future]).then((result){
  var p3 = result[0] + result[1];
  print("p3 ${p3}");
});

Если вы хотите что-то ждать в Дартсе, это должно быть будущее. Вы можете преобразовать событие потока или порта в будущее разными способами. Если вы сомневаетесь, вы всегда можете использовать Completer создать будущее из любого другого события. В этом случае это может быть сделано проще, потому что вы просто хотите одно событие из каждого потока, и вы можете использовать Stream.first (или же Stream.last или же Stream.single) для этого.

import "dart:async";
import "dart:isolate";

main() {
  var rPort1 = new ReceivePort();
  var rPort2 = new ReceivePort();

  Future.wait([
      Isolate.spawnUri(new Uri.file("my_isolate.dart"), [0], rPort1.sendPort)
             .then((_) => rPort1.first,
                   onError: (_) => rPort1.close()),
      Isolate.spawnUri(new Uri.file("my_isolate.dart"), [0], rPort2.sendPort)
             .then((_) => rPort2.first,
                   onError: (_) => rPort2.close()),
  ]).then((ps) {
     // Waiting for both streams to complete before summing our results
     var p3 = ps[0] + ps[1];
     print("p3 ${p3}");
  });
}

Вот и я жду spawnUri вернуть Future потому что это может содержать ошибку, если ваш изолят не появлялся правильно.

Вы также можете использовать некоторые вспомогательные функции в изолированном пакете.

import "dart:async";
import "dart:isolate";
import "package:isolate/isolate.dart";

main() async {
  // A SingleResponseChannel has a send-port and a result future,
  // and completes the future with the first port event.
  // Warning: Only closed when event is sent on port!
  // Consider setting a time-out on the channel.
  var c1 = new SingleResponseChannel();
  var c2 = new SingleResponseChannel();
  Isolate.spawnUri(new Uri.file("my_isolate.dart"), [0], c1.port);
  Isolate.spawnUri(new Uri.file("my_isolate.dart"), [0], c2.port);
  var p3 = await c1.result + await c2.result;
  print("p3 ${p3}");
}
Другие вопросы по тегам