StreamRetryTemplate для Spring Cloud Streams не повторяется в интеграционных тестах
Мы используем Spring Cloud Streams, которые слушают тему Кафки и вызывают службу отдыха. Мы также реализуем пользовательский StreamRetryTemplate, чтобы указать, какие ошибки мы считаем восстанавливаемыми, а какие нет. Я не могу получить согласованные результаты между тем, как он работает во время выполнения и как он работает в интеграционных тестах.
Я проверил в режиме отладки, что исключение генерируется правильно и что RetryTemplate вводится правильно, но, похоже, он не используется в моих интеграционных тестах.
@EnableBinding(Sink::class)
class MyListener(private val myService: Service) {
@StreamListener(Sink.Input)
fun consume(@Payload msg: MyMessage) = myService.process(msg)
@SteamRetryTemplate
fun getRetryTemplate() = RetryTemplate()
}
Когда я запускаю это приложение и myService выдает исключение, я ожидаю, что оно будет повторено, и это происходит идеально. Но когда я пишу интеграционные тесты с сервером Wiremock, и myService выдает исключение, он не повторяется. У меня есть операторы assert, чтобы проверить, сколько раз моя конечная точка Wiremock поражена.
Я что-то упускаю специально для повторных попыток работать в интеграционных тестах?
1 ответ
Вы используете тестовое связующее или встроенный брокер кафки? Связующее для испытаний довольно ограничено; использование встроенного брокера является предпочтительным для полного тестирования интеграции.
См. Тестирование приложений весной для документации Apache Kafka.
РЕДАКТИРОВАТЬ
@SpringBootApplication
@EnableBinding(Sink.class)
public class So55855151Application {
public static void main(String[] args) {
SpringApplication.run(So55855151Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
throw new RuntimeException("fail");
}
@StreamRetryTemplate
public RetryTemplate retrier() {
return new RetryTemplate();
}
}
spring.cloud.stream.bindings.input.group=input
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
public class So55855151ApplicationTests {
@Autowired
private KafkaTemplate<byte[], byte[]> template;
@Autowired
private RetryTemplate retrier;
@Test
public void test() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
this.retrier.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
System.out.println("open");
latch.countDown();
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("close");
latch.countDown();
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("onError: " + throwable);
latch.countDown();
}
});
this.template.send("input", "test".getBytes());
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
}