Не удается записать данные в ElasticSearch с помощью AbstractReactiveElasticsearchConfiguration
Я пытаюсь записать данные в свой локальный контейнер Docker Elasticsearch (7.4.2), для простоты я использовал AbstractReactiveElasticsearchConfiguration, предоставленный Spring, также переопределяя функцию entityMapper. Я построил свой репозиторий, расширяя ReactiveElasticsearchRepository. Затем, в конце концов, я использовал свой репозиторий с автоматическим подключением для saveAll() моей коллекции элементов, содержащих данные. Однако Elasticsearch не записывает никаких данных. Также у меня есть контроллер REST, который запускает весь мой процесс, в основном ничего не возвращая, DeferredResult>
Метод REST из моего ApiDelegateImpl
@Override
public DeferredResult<ResponseEntity<Void>> openUsageExporterStartPost() {
final DeferredResult<ResponseEntity<Void>> deferredResult = new DeferredResult<>();
ForkJoinPool.commonPool().execute(() -> {
try {
openUsageExporterAdapter.startExport();
deferredResult.setResult(ResponseEntity.accepted().build());
} catch (Exception e) {
deferredResult.setErrorResult(e);
}
}
);
return deferredResult;
}
Моя конфигурация Elasticsearch
@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {
@Value("${spring.data.elasticsearch.client.reactive.endpoints}")
private String elasticSearchEndpoint;
@Bean
@Override
public EntityMapper entityMapper() {
final ElasticsearchEntityMapper entityMapper = new ElasticsearchEntityMapper(elasticsearchMappingContext(), new DefaultConversionService());
entityMapper.setConversions(elasticsearchCustomConversions());
return entityMapper;
}
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchEndpoint)
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}
Мой репозиторий
public interface OpenUsageRepository extends ReactiveElasticsearchRepository<OpenUsage, Long> {
}
Мой DTO
@Data
@Document(indexName = "open_usages", type = "open_usages")
@TypeAlias("OpenUsage")
public class OpenUsage {
@Field(name = "id")
@Id
private Long id;
......
}
Моя реализация адаптера
@Autowired
private final OpenUsageRepository openUsageRepository;
...transform entity into OpenUsage...
public void doSomething(final List<OpenUsage> openUsages){
openUsageRepository.saveAll(openUsages)
}
И наконец мой IT-тест
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestPropertySource(locations = {"classpath:application-it.properties"})
@ContextConfiguration(initializers = OpenUsageExporterApplicationIT.Initializer.class)
class OpenUsageExporterApplicationIT {
@LocalServerPort
private int port;
private final static String STARTCALL = "http://localhost:%s/open-usage-exporter/start/";
@Container
private static ElasticsearchContainer container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:6.8.4").withExposedPorts(9200);
static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(final ConfigurableApplicationContext configurableApplicationContext) {
final List<String> pairs = new ArrayList<>();
pairs.add("spring.data.elasticsearch.client.reactive.endpoints=" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
pairs.add("spring.elasticsearch.rest.uris=http://" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
TestPropertyValues.of(pairs).applyTo(configurableApplicationContext);
}
}
@Test
void testExportToES() throws IOException, InterruptedException {
final List<OpenUsageEntity> openUsageEntities = dbPreparator.insertTestData();
assertTrue(openUsageEntities.size() > 0);
final String result = executeRestCall(STARTCALL);
// Awaitility here tells me nothing is in ElasticSearch :(
}
private String executeRestCall(final String urlTemplate) throws IOException {
final String url = String.format(urlTemplate, port);
final HttpUriRequest request = new HttpPost(url);
final HttpResponse response = HttpClientBuilder.create().build().execute(request);
// Get the result.
return EntityUtils.toString(response.getEntity());
}
}
1 ответ
public void doSomething(final List<OpenUsage> openUsages){
openUsageRepository.saveAll(openUsages)
}
В конце отсутствует точка с запятой, поэтому он не должен компилироваться.
Но я предполагаю, что это всего лишь опечатка, а на самом деле стоит точка с запятой.
Тем не мение, saveAll()
возвращает Flux
. ЭтаFlux
это просто рецепт для сохранения ваших данных, и он не выполняется до тех пор, пока subscribe()
называется кем-то (или что-то вроде blockLast()
). Вы просто бросаете этоFlux
прочь, поэтому сохранение никогда не выполняется.
Как это исправить? Один из вариантов - добавить.blockLast()
вызов:
openUsageRepository.saveAll(openUsages).blockLast();
Но это сохранит данные блокирующим образом, эффективно подавляя реактивность.
Другой вариант - если код, который вы вызываетеsaveAll()
от опор реактивность - это просто вернутьFlux
вернулся saveAll()
, но, как ваш doSomething()
имеет void
тип возврата, это сомнительно.
Не видно, как твоя startExport()
подключается к doSomething()
тем не мение. Но похоже, что ваш "вызывающий код" не использует понятия реактивности, поэтому реальным решением было бы либо переписать вызывающий код, чтобы использовать реактивность (получитьPublisher
а также subscribe()
на нем, затем дождитесь прибытия данных) или вернитесь к использованию API блокировки (ElasticsearchRepository
вместо того ReactiveElasticsearchRepository
).