Интеграционное тестирование Spring SseEmitters

Я искал подсказки о том, как лучше всего протестировать методы Spring MVC Controller, которые возвращают SseEmitters. Я подошел довольно коротко, но у меня есть метод проб и ошибок, который проверяет асинхронное многопоточное поведение. Ниже приведен пример кода, просто чтобы продемонстрировать концепцию, может быть опечатка или два:

Класс контроллера:

@Autowired
Publisher<MyResponse> responsePublisher;

@RequestMapping("/mypath")
public SseEmitter index() throws IOException {
    SseEmitter emitter = new SseEmitter();
    Observable<MyResponse> responseObservable = RxReactiveStreams.toObservable(responsePublisher);

    responseObservable.subscribe(
            response -> {
                try {
                    emitter.send(response);
               } catch (IOException ex) {
                    emitter.completeWithError(ex);
               }
            },
            error -> {
                emitter.completeWithError(error);
            },
            emitter::complete
    );

    return emitter;
}

Тестовый класс:

//A threaded dummy publisher to demonstrate async properties.
//Sends 2 responses with a 250ms pause in between.
protected static class MockPublisher implements Publisher<MyResponse> {
    @Override
    public void subscribe(Subscriber<? super MyResponse> subscriber) {
        new Thread() {
            @Override
            public void run() {
                try {
                    subscriber.onNext(response1);
                    Thread.sleep(250);
                    subscriber.onNext(response2);
                } catch (InterruptedException ex) {
                }
                subscriber.onComplete();
            }
        }.start();
    }
}

//Assume @Configuration that autowires the above mock publisher in the controller.

//Tests the output of the controller method.
@Test
public void testSseEmitter() throws Exception {
    String path = "http://localhost/mypath/";
    String expectedContent = "data:" + response1.toString() + "\n\n" +
                             "data:" + response2.toString() + "\n\n");

    //Trial-and-Error attempts at testing this SseEmitter mechanism have yielded the following:
    //- Returning an SseEmitter triggers 'asyncStarted'
    //- Calling 'asyncResult' forces the test to wait for the process to complete
    //- However, there is no actual 'asyncResult' to test.  Instead, the content is checked for the published data.
    mockMvc.perform(get(path).contentType(MediaType.ALL))
        .andExpect(status().isOk())
        .andExpect(request().asyncStarted())
        .andExpect(request().asyncResult(nullValue()))
        .andExpect(header().string("Content-Type", "text/event-stream"))
        .andExpect(content().string(expectedContent))
}

Как отмечено в комментариях, вызывается asyncResult(), чтобы издатель завершил свою работу и отправил оба ответа до завершения теста. Без этого проверка содержимого не выполняется, поскольку в содержимом присутствует только один ответ. Однако фактического результата для проверки нет, поэтому asyncResult имеет значение null.

Мой конкретный вопрос заключается в том, есть ли лучший, более точный способ заставить тест ждать завершения асинхронного процесса, а не метод klugie для ожидания несуществующего asyncResult. Мой более широкий вопрос заключается в том, существуют ли другие libs или Spring-методы, которые лучше подходят для этого по сравнению с этими асинхронными функциями. Спасибо!

0 ответов

Это более общий ответ, поскольку он предназначен для тестирования SseEmitter, который будет работать вечно, но отключится от потока SSE по истечении заданного времени ожидания.

Что касается подхода, отличного от MVC, как прокомментировал в OP @ErinDrummond, вы можете захотеть изучить WebFlux.

Это минимальный пример. Можно добавить заголовки к запросу, различные сопоставители или, возможно, поработать над выводом потока отдельно.

Он устанавливает отложенный поток для отключения от потока SSE, который позволит выполнять утверждения.

@Autowired
MockMvc mockMvc;

private static final ScheduledExecutorService execService = Executors.newScheduledThreadPool(1);

@Test
public void testSseEmitter(String streamURI, long timeout, TimeUnit timeUnit){
    MvcResult result = mockMvc.perform(get(streamURI)
            .andExpect(request().asyncStarted()).andReturn();

    MockAsyncContext asyncContext = (MockAsyncContext) result.getRequest().getAsyncContext();
    execService.schedule(() -> {
        for (AsyncListener listener : asyncContext.getListeners())
            try {
                listener.onTimeout(null);
            } catch (IOException e) {
                e.printStackTrace();
            }
    }, timeout, timeUnit);

    result.getAsyncResult();

    // assertions, e.g. response body as string contains "xyz"
    mvc.perform(asyncDispatch(result)).andExpect(content().string(containsString("xyz");
}
Другие вопросы по тегам