Не удается записать данные в ElasticSearch с помощью AbstractReactiveElasticsearchConfiguration

Я пытаюсь записать данные в свой локальный контейнер Docker Elasticsearch (7.4.2), для простоты я использовал AbstractReactiveElasticsearchConfiguration, предоставленный Spring, также переопределяя функцию entityMapper. Я построил свой репозиторий, расширяя ReactiveElasticsearchRepository. Затем, в конце концов, я использовал свой репозиторий с автоматическим подключением для saveAll() моей коллекции элементов, содержащих данные. Однако Elasticsearch не записывает никаких данных. Также у меня есть контроллер REST, который запускает весь мой процесс, в основном ничего не возвращая, DeferredResult>

Метод REST из моего ApiDelegateImpl

  @Override
  public DeferredResult<ResponseEntity<Void>> openUsageExporterStartPost() {

    final DeferredResult<ResponseEntity<Void>> deferredResult = new DeferredResult<>();

    ForkJoinPool.commonPool().execute(() -> {

          try {
            openUsageExporterAdapter.startExport();
            deferredResult.setResult(ResponseEntity.accepted().build());

          } catch (Exception e) {
            deferredResult.setErrorResult(e);
          }
        }
    );

    return deferredResult;
  }

Моя конфигурация Elasticsearch

@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {

  @Value("${spring.data.elasticsearch.client.reactive.endpoints}")
  private String elasticSearchEndpoint;

  @Bean
  @Override
  public EntityMapper entityMapper() {

    final ElasticsearchEntityMapper entityMapper = new ElasticsearchEntityMapper(elasticsearchMappingContext(), new DefaultConversionService());
    entityMapper.setConversions(elasticsearchCustomConversions());
    return entityMapper;
  }

  @Override
  public ReactiveElasticsearchClient reactiveElasticsearchClient() {
    ClientConfiguration clientConfiguration = ClientConfiguration.builder()
        .connectedTo(elasticSearchEndpoint)
        .build();

    return ReactiveRestClients.create(clientConfiguration);
  }
}

Мой репозиторий

public interface OpenUsageRepository extends ReactiveElasticsearchRepository<OpenUsage, Long> {

}

Мой DTO

@Data
@Document(indexName = "open_usages", type = "open_usages")
@TypeAlias("OpenUsage")
public class OpenUsage {

  @Field(name = "id")
  @Id
  private Long id;

  ......
}

Моя реализация адаптера

  @Autowired
  private final OpenUsageRepository openUsageRepository;

  ...transform entity into OpenUsage...

  public void doSomething(final List<OpenUsage> openUsages){
   openUsageRepository.saveAll(openUsages)
  }

И наконец мой IT-тест

@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestPropertySource(locations = {"classpath:application-it.properties"})
@ContextConfiguration(initializers = OpenUsageExporterApplicationIT.Initializer.class)
class OpenUsageExporterApplicationIT {


  @LocalServerPort
  private int port;

  private final static String STARTCALL = "http://localhost:%s/open-usage-exporter/start/";

  @Container
  private static ElasticsearchContainer container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:6.8.4").withExposedPorts(9200);

  static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

    @Override
    public void initialize(final ConfigurableApplicationContext configurableApplicationContext) {

      final List<String> pairs = new ArrayList<>();

      pairs.add("spring.data.elasticsearch.client.reactive.endpoints=" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
      pairs.add("spring.elasticsearch.rest.uris=http://" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
      TestPropertyValues.of(pairs).applyTo(configurableApplicationContext);
    }
  }

  @Test
  void testExportToES() throws IOException, InterruptedException {

    final List<OpenUsageEntity> openUsageEntities = dbPreparator.insertTestData();
    assertTrue(openUsageEntities.size() > 0);

    final String result = executeRestCall(STARTCALL);

    // Awaitility here tells me nothing is in ElasticSearch :(

  }

  private String executeRestCall(final String urlTemplate) throws IOException {

    final String url = String.format(urlTemplate, port);

    final HttpUriRequest request = new HttpPost(url);
    final HttpResponse response = HttpClientBuilder.create().build().execute(request);

    // Get the result.
    return EntityUtils.toString(response.getEntity());
  }
}

1 ответ

public void doSomething(final List<OpenUsage> openUsages){
 openUsageRepository.saveAll(openUsages)
}

В конце отсутствует точка с запятой, поэтому он не должен компилироваться.

Но я предполагаю, что это всего лишь опечатка, а на самом деле стоит точка с запятой.

Тем не мение, saveAll() возвращает Flux. ЭтаFlux это просто рецепт для сохранения ваших данных, и он не выполняется до тех пор, пока subscribe() называется кем-то (или что-то вроде blockLast()). Вы просто бросаете этоFlux прочь, поэтому сохранение никогда не выполняется.

Как это исправить? Один из вариантов - добавить.blockLast() вызов:

openUsageRepository.saveAll(openUsages).blockLast();

Но это сохранит данные блокирующим образом, эффективно подавляя реактивность.

Другой вариант - если код, который вы вызываетеsaveAll()от опор реактивность - это просто вернутьFlux вернулся saveAll(), но, как ваш doSomething() имеет void тип возврата, это сомнительно.

Не видно, как твоя startExport() подключается к doSomething()тем не мение. Но похоже, что ваш "вызывающий код" не использует понятия реактивности, поэтому реальным решением было бы либо переписать вызывающий код, чтобы использовать реактивность (получитьPublisher а также subscribe() на нем, затем дождитесь прибытия данных) или вернитесь к использованию API блокировки (ElasticsearchRepository вместо того ReactiveElasticsearchRepository).

Другие вопросы по тегам