Dart: обрабатывать входящие HTTP-запросы параллельно

Я пытаюсь написать HTTP-сервер в Dart, который может обрабатывать несколько запросов параллельно. До сих пор мне не удавалось достичь "параллельной" части.

Вот что я попробовал сначала:

import 'dart:io';

main() {
  HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
    server.listen((HttpRequest request) {
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.start();
      while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
      request.response.statusCode = HttpStatus.OK;
      request.response.write(stopwatch.elapsedMilliseconds.toString());
      request.response.close().catchError(print);
    });
  });
}

По каждому запросу он занят работой в течение одной секунды, а затем завершается. Я сделал так, чтобы запросы обрабатывались таким образом, чтобы его время было предсказуемым, и чтобы я мог легко увидеть эффект запроса в диспетчере задач Windows (ядро ЦП переходит на 100% использование).

Я могу сказать, что это не обрабатывает запросы параллельно, потому что:

  1. Если я загружаю несколько вкладок браузера в http://example:8080/ и затем обновите их все, вкладки загружаются одна за другой в последовательности, приблизительно 1 секунда между каждой.

  2. Если я использую инструмент тестирования нагрузки с этими настройками... wrk -d 10 -c 8 -t 8 http://example:8080/... он выполняет от 5 до 8 запросов за 10 секунд, которые я ему дал. Если бы сервер использовал все мои 8 ядер, я бы ожидал число, близкое к 80 запросам.

  3. Когда я открываю диспетчер задач Windows во время теста wrk, я замечаю, что только одно из моих ядер работает почти на 100%, а остальные почти бездействуют.

Итак, я попытался использовать изоляты, надеясь вручную создать новый изолят / поток для каждого запроса:

import 'dart:io';
import 'dart:isolate';

main() {
  HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
    server.listen((HttpRequest request) {
      spawnFunction(handleRequest).send(request);
    });
  });
}

handleRequest() {
  port.receive((HttpRequest request, SendPort sender) {
    Stopwatch stopwatch = new Stopwatch();
    stopwatch.start();
    while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
    request.response.statusCode = HttpStatus.OK;
    request.response.write(stopwatch.elapsedMilliseconds.toString());
    request.response.close().catchError(print);
  });
}

Это не работает вообще. Мне не нравится, что я пытаюсь отправить HttpRequest как сообщение в изолят. Вот ошибка:

#0      _SendPortImpl._sendInternal (dart:isolate-patch/isolate_patch.dart:122:3)
#1      _SendPortImpl._sendNow (dart:isolate-patch/isolate_patch.dart:95:18)
#2      _SendPortImpl.send (dart:isolate-patch/isolate_patch.dart:91:18)
#3      main.<anonymous closure>.<anonymous closure> (file:///C:/Development/dartbenchmark/simple2.dart:7:40)
#4      _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#5      _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#6      _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#7      _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#8      _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#9      StreamController.add (dart:async/stream_controller.dart:10:35)
#10     _HttpServer._handleRequest (http_impl.dart:1261:20)
#11     _HttpConnection._HttpConnection.<anonymous closure> (http_impl.dart:1188:33)
#12     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#13     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#14     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#15     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#16     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#17     StreamController.add (dart:async/stream_controller.dart:10:35)
#18     _HttpParser._doParse (http_parser.dart:415:26)
#19     _HttpParser._parse (http_parser.dart:161:15)
#20     _HttpParser._onData._onData (http_parser.dart:509:11)
#21     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#22     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#23     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#24     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#25     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#26     StreamController.add (dart:async/stream_controller.dart:10:35)
#27     _Socket._onData._onData (dart:io-patch/socket_patch.dart:726:42)
#28     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#29     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#30     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#31     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#32     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#33     StreamController.add (dart:async/stream_controller.dart:10:35)
#34     _RawSocket._RawSocket.<anonymous closure> (dart:io-patch/socket_patch.dart:452:52)
#35     _NativeSocket.multiplex (dart:io-patch/socket_patch.dart:253:18)
#36     _NativeSocket.connectToEventHandler.<anonymous closure> (dart:io-patch/socket_patch.dart:338:54)
#37     _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)

Unhandled exception:
Illegal argument(s): Illegal argument in isolate message : (object is a closure)
#0      _throwDelayed.<anonymous closure> (dart:async/stream_impl.dart:22:5)
#1      _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:15:17)
#2      _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:25:9)
#3      Timer.run.<anonymous closure> (dart:async/timer.dart:17:21)
#4      Timer.run.<anonymous closure> (dart:async/timer.dart:25:13)
#5      Timer.Timer.<anonymous closure> (dart:async-patch/timer_patch.dart:9:15)
#6      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:99:28)
#7      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:107:7)
#8      _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:115:23)
#9      _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)

Используемые версии:

  • Dart Editor версия 0.5.9_r22879
  • Dart SDK версия 0.5.9.0_r22879

Можно ли обрабатывать эти запросы параллельно со всеми доступными ядрами моей машины, используя Dart?

2 ответа

Решение

Я написал библиотеку dart-isoserver, чтобы сделать это некоторое время назад. Сейчас он сильно сгнил, но вы можете увидеть подход.

https://code.google.com/p/dart-isoserver/

Я использовал прокси HttpRequest и HttpResponse через изолирующие порты, поскольку вы не можете отправить их напрямую. Это сработало, хотя было несколько предостережений:

  1. Ввод / вывод по запросу и ответу шел через основной изолят, так что эта часть не была параллельной. Другая работа, выполненная в рабочем изоляте, не блокировала главный изолят. Что действительно должно произойти, так это то, что сокетное соединение должно передаваться между изолятами.
  2. Исключения в изоляте повредят весь сервер. spawnFunction() теперь имеет необработанный параметр обработчика исключений, так что это несколько исправимо, но spawnUri() - нет. dart-isoserver использовал spawnUri() для реализации горячей загрузки, поэтому его нужно будет удалить.
  3. Изоляты запускаются немного медленно, и вам, вероятно, не нужен один на соединение для тысяч случаев одновременного использования соединений, на которые ориентированы nginx и node.js. Изолированный пул с рабочими очередями, вероятно, будет работать лучше, хотя это исключит хорошую функцию, которую вы могли бы использовать для блокирования ввода-вывода на рабочем месте.

Заметка о вашем первом примере кода. Как вы заметили, это точно не будет работать параллельно, потому что Dart однопоточный. Никакой код Dart в одном и том же изоляте никогда не запускается одновременно.

Вам нужно:

  1. Задавать shared: true в HttpServer.bind
  2. Создайте несколько изолятов для параллельной обработки входящих запросов.

Вот базовый минимальный сервер Dart, который распределяет входящие запросы по 6 изоляциям:

import 'dart:io';
import 'dart:isolate';

void main() async {
  for (var i = 1; i < 6; i++) {
    Isolate.spawn(_startServer, []);
  }

  // Bind one server in current Isolate
  _startServer();

  print('Serving at http://localhost:8080/');
  await ProcessSignal.sigterm.watch().first;
}

void _startServer([List args]) async {
  final server = await HttpServer.bind(
    InternetAddress.loopbackIPv4,
    8080,
    shared: true, // This is the magic sauce
  );

  await for (final request in server) {
    _handleRequest(request);
  }
}

void _handleRequest(HttpRequest request) async {
  // Fake delay
  await Future.delayed(const Duration(seconds: 2));

  request.response.write('Hello, world!');
  await request.response.close();
}

Даже с текущими ограничениями HttpServer можно использовать несколько ядер, запустив несколько процессов сервера за обратным прокси-сервером, таким как Apache или Nginx. Внутри Dart вы также можете разветвлять дочерние процессы, чтобы разделить сложные вычислительные задачи.

Хорошее место для начала было бы прочитать о масштабировании node.js, так как он также использует однопоточную архитектуру для каждого процесса.

Изменить: Ответ устарел, теперь можно разделять запросы между изолятами, что позволяет процессу Dart использовать несколько ядер.

См. Документы для ServerSocket.bind (общий доступ).

"Необязательный аргумент shared указывает, возможны ли дополнительные привязки к одному и тому же адресу, порту и комбинации v6Only из одного и того же процесса Dart. Если shared имеет значение true и выполняются дополнительные привязки, то входящие соединения будут распределены между этим набором ServerSockets. способ использования этого состоит в том, чтобы иметь количество изолятов, между которыми распределяются входящие соединения."

Другие вопросы по тегам