Могут ли подписчики обрабатывать исключения, создаваемые в потоках дротиков, без закрытия потока?

Краткий пример того, что я не понимаю:

Stream<int> getNumbersWithException() async* {
  for (var i = 0; i < 10; i++) {
    yield i;
    if (i == 3) throw Exception();
  }
}

С использованием:

getNumbersWithException()
    .handleError((x) => print('Exception caught for $x'))
    .listen((event) {
  print('Observed: $event');
});

Это остановится на 3 с выводом:

Observed: 0
Observed: 1
Observed: 2
Observed: 3
Exception caught for Exception: foo

Из документации (https://dart.dev/tutorials/language/streams) и ( https://api.dart.dev/stable/2.9.1/dart-async/Stream/handleError.html) это как ожидается, поскольку возникшие исключения автоматически закроют поток.

  1. Означает ли это, что правильный способ обработки исключений в потоке, чтобы подписки могли быть долговременными в таком событии, - это обработка исключения внутри самого потока? Что это невозможно сделать извне?
  2. То же самое и с трансляционными потоками?
  3. Если я думаю об этом неправильно, каковы некоторые подсказки, чтобы начать думать правильно?

В настоящее время я считаю потоки источником асинхронных событий данных, которые иногда могут быть ошибочными. Судя по документации и примерам, все выглядит аккуратно, но я думаю, что желание обрабатывать ошибки и в противном случае продолжать наблюдение за потоком данных является нормальным вариантом использования. Мне сложно написать код для этого. Но, возможно, я ошибаюсь. Будем очень признательны за любые идеи.


Изменить: я могу добавить, что пробовал разные вещи, например, использовать преобразователь потока, с тем же результатом:

var transformer = StreamTransformer<int, dynamic>.fromHandlers(
  handleData: (data, sink) => sink.add(data),
  handleError: (error, stackTrace, sink) =>
      print('Exception caught for $error'),
  handleDone: (sink) => sink.close(),
);
getNumbersWithException().transform(transformer).listen((data) {
  print('Observed: $data');
});

Также, listen() имеет необязательный аргумент cancelOnError это выглядит многообещающе, но по умолчанию false, так что здесь нет сигары.

2 ответа

Генераторный метод

Stream<int> getNumbersWithException() async* {
  for (var i = 0; i < 10; i++) {
    yield i;
    if (i == 3) throw Exception();
  }
}

завершится, когда вы создадите исключение. Вthrowработает нормально, исключение напрямую не добавляется в поток. Таким образом, он распространяется через цикл и тело метода до тех пор, пока все тело метода не завершится сгенерированным исключением. В этот момент в поток добавляется необработанное исключение, а затем поток закрывается, потому что тело завершено.

Итак, проблема не в обработке, а в генерации потока. Вы действительно должны обрабатывать ошибку локально, чтобы избежать завершения тела, генерирующего поток.

Вы не можете добавить более одной ошибки в поток, используя throw в async*, и ошибка будет последним, что делает поток.

Хакерский метод, позволяющий фактически выдать более одной ошибки, должен вызвать исключение:

  if (i == 3) yield* () async* { throw Exception(); }();
  // or:      yield* Stream.fromFuture(Future.error(Exception());

Это вызовет исключение непосредственно в сгенерированный поток, не вызывая его локально и не завершая тело метода генератора.

Использоватьyield* Stream.error()вместо того, чтобы выдавать ошибку.

Пример:

      Stream<int> getStream() async* {
  for (var i = 0; i < 5; i++) {
    yield i;
    if (i == 2 || i == 3) {
      yield* Stream.error('Custom error at index $i');
    }
  }
}

void main(List<String> arguments) {
  var stream = getStream();
  stream.listen((event) => print('Data: $event'),
      onDone: () => print('Done'), onError: (err) => print('Error: $err'));
}

Выход:

      Data: 0
Data: 1
Data: 2
Error: Custom error at index 2
Data: 3
Error: Custom error at index 3
Data: 4
Done
Другие вопросы по тегам