RxJava Eventbus и обработка ошибок
Я пытаюсь создать событийную шину с помощью rxjava, в которую я добавляю некоторые команды, которые будут продолжаться даже при возникновении ошибок. Я изучал onErrorFlatMap, но он больше не существует, и я пока не могу материализоваться и дематериализоваться.
Это то, что у меня уже есть:
Мой маленький тест
public class EventBusTest {
private EventBus eventBus = new EventBus();
@Test
public void test() {
//listeners in service layer
eventBus.on(createOf(BeanA.class))
.subscribe(this::handleBeanAInService1);
eventBus.on(createOf(BeanA.class))
.doOnNext(BeanA::validateBusinessLogic)
.subscribe(this::handleBeanAInService2);
eventBus.on(createOf(BeanB.class))
.subscribe(this::handleBeanBInService);
//incoming requests in rest layer
List<String> incomingCalls = new ArrayList<>();
Collections.addAll(incomingCalls, "firstRequest", "secondRequestErrorOnValidate", "thirdRequest", "fourthRequestErrorInService", "fifthRequest");
incomingCalls.stream().forEach(this::incomingCallsEgHttpCalls);
}
public void incomingCallsEgHttpCalls(String string) {
Observable.just(string)
.map(aName -> new BeanA(aName))
.doOnNext(bean -> eventBus.post(new CreateCommand(bean)))
.subscribe(bean -> System.out.println("\tReturn ok to client: " + bean.getName()), error -> System.out.println("\tReturning error to client: " + string + "; " + error.getMessage()));
}
public void handleBeanAInService1(BeanA beanA) {
System.out.println("BeanAService1 handling BeanA " + beanA.getName());
if(beanA.getName().contains("ErrorInService")) {
throw new RuntimeException("service exception for " + beanA.getName());
}
eventBus.post(new CreateCommand(new BeanB(beanA.getName())));
}
public void handleBeanAInService2(BeanA beanA) {
System.out.println("BeanAService2 handling BeanA " + beanA.getName());
}
public void handleBeanBInService(BeanB beanB) {
System.out.println("BeanBService handling BeanB " + beanB.getName());
}
}
EventBus
@Named
public class EventBus {
private PublishSubject<Object> publishSubject = PublishSubject.create();
public EventBus post(Object object) {
//publishSubject.onNext(object); => does not work, OnErrorNotImplementedExceptions if eventbus observers throw errors in validate
//To prevent OnErrorNotImplementedException
try {
publishSubject.onNext(object);
} catch (OnErrorNotImplementedException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new RuntimeException(cause);
}
return this;
}
public Observable<Object> observe() {
//I also tried
//return publishSubject.onErrorResumeNext(publishSubject);
return publishSubject;
}
public <F, T> Observable<T> on(Observable.Transformer<F, T> onCommand) {
return onCommand.call((Observable<F>) publishSubject);
}
}
CreateCommand
public class CreateCommand {
private Object object;
public CreateCommand(Object object) {
this.object = object;
}
public Class<?> type() {
return object.getClass();
}
public <T> T value() {
return (T) object;
}
public static <F, T> Observable.Transformer<F, T> createOf(Class<T> clazz) {
return observable -> observable
.ofType(CreateCommand.class)
.filter(createCommand -> clazz.isInstance(createCommand.object))
.map(createCommand -> clazz.cast(createCommand.object));
}
}
BeanA
public class BeanA {
private String name;
public BeanA(String name) {
this.name = name;
}
public String getName() {
return name;
}
public static void validateBusinessLogic(BeanA beanA) {
if (beanA.getName().contains("ErrorOnValidate")) {
throw new RuntimeException("validate exception for " + beanA.getName());
}
}
}
BeanB
public class BeanB {
private String name;
public BeanB(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
Фактический вывод
BeanAService1 handling BeanA firstRequest
BeanBService handling BeanB firstRequest
BeanAService2 handling BeanA firstRequest
Return ok to client: firstRequest
BeanAService1 handling BeanA secondRequestErrorOnValidate
BeanBService handling BeanB secondRequestErrorOnValidate
Returning error to client: secondRequestErrorOnValidate; validate exception for secondRequestErrorOnValidate
BeanAService1 handling BeanA thirdRequest
BeanBService handling BeanB thirdRequest
Return ok to client: thirdRequest
BeanAService1 handling BeanA fourthRequestErrorInService
Returning error to client: fourthRequestErrorInService; service exception for fourthRequestErrorInService
Return ok to client: fifthRequest
Требуемый вывод
BeanAService1 handling BeanA firstRequest
BeanBService handling BeanB firstRequest
BeanAService2 handling BeanA firstRequest
Return ok to client: firstRequest
BeanAService1 handling BeanA secondRequestErrorOnValidate
BeanBService handling BeanB secondRequestErrorOnValidate
Returning error to client: secondRequestErrorOnValidate; validate exception for secondRequestErrorOnValidate
BeanAService1 handling BeanA thirdRequest
BeanBService handling BeanB thirdRequest
BeanAService2 handling BeanA thirdRequest
Return ok to client: thirdRequest
BeanAService1 handling BeanA fourthRequestErrorInService
BeanAService2 handling BeanA fourthRequestErrorInService
Returning error to client: fourthRequestErrorInService; service exception for fourthRequestErrorInService
BeanAService1 handling BeanA fifthRequest
BeanBService handling BeanB fifthRequest
BeanAService2 handling BeanA fifthRequest
Return ok to client: fifthRequest
Есть идеи как это решить? Я видел проблему в rxjava, касающуюся примеров реализации EventBus, но не нашел ее.
Я также хотел бы, чтобы моим службам не нужно было выполнять какую-то конкретную обработку ошибок rxjava.
Или есть другой способ передачи событий нескольким другим наблюдаемым, а затем повторное сжатие результатов?
1 ответ
Ошибки в RxJava являются терминальными событиями, которые разрушают наблюдаемые потоки (если они не захвачены / не возобновлены операторами onErrorXXX и onException). Если вы хотите, чтобы потоки оставались живыми, вам нужно заключить ошибку в какой-либо другой тип значения (например, Уведомление) и развернуть его в соответствующих местах.
Вот пример гистограммы, которая показывает реализацию eventbus поверх конструкций RxJava. Обратите внимание на следующие вещи:
- Вам может потребоваться сериализовать доступ к теме в случае, если метод post() шины событий может быть вызван из нескольких потоков.
- RxJava иногда не возвращает назад исключения, выданные из onNext, но только в самом начале потока, завершая весь поток, поэтому вам нужен оператор, который ограничивает такие ошибки ниже определенного уровня, следовательно, оператор ClientErrorBounceBack в примере.