Связанные запросы Spring WebClient, выполняемые для синхронных API
Я знаю, что это слишком много контекста, но, пожалуйста, потерпите меня.
Мы создаем службу на основе Spring Boot, которая в основном считывает некоторые данные из внешнего синхронного API через HTTP-вызовы и, в зависимости от некоторых конкретных флагов в данных, которые мы читаем из внешней системы, запускает некоторые дополнительные операции. Подробнее:
- Внешняя система предоставляет информацию в виде строк в формате JSON, сгруппированных в последовательности. Мы читаем последовательности постепенно с интервалом последовательности. Пример запроса на интервал последовательности: https: // external_host / api / contactchannels? FromSeq=0&toSeq=100. Мы заранее знаем максимальный порядковый номер. Так, например, если максимальный номер последовательности равен 80000, мы выполним 800 (последовательно) таких запросов (при условии, если интервал последовательности равен 100). Такой запрос отвечает типом содержимого text/plain и телом с количеством строк (это не фиксированное число, может быть от 0 до нескольких тысяч строк). Пример ответа, содержащий 3 строки:
{"id": "440C9931-DDF5-43A1-B217-3D81BFF27E4E", "contactId": "A764A2B9-172F-48E5-AD16-D20797F516E3", "permissionType": "MARKETING_PERMISSION", "status": "ACTIVE"}
{"id": "27E9A769-DE6A-4CD2-B9FD-D28683A5C726", "contactId": "C2D90BD1-4027-4A5C-A91A-E3ACABC8A9D2", "permissionType": "EMAIL", "status": "ACTIVE"}
{"id": "41B792A6-C701-4E03-AB73-59E1E21C75E3", "contactId": "91D2F68B-DC95-45CD-9A5F-65C4BAE67661", "permissionType": "EMAIL", "status": "PENDING"}
Каждая строка описывает каналы связи объекта.
- Для каждого выполняемого нами инкрементного запроса (как указано выше) мы обрабатываем ответ, отфильтровывая все строки, кроме тех, которые имеют определенный тип разрешения ("EMAIL") и определенный статус ("PENDING"). В приведенном выше примере мы говорим только об этой строке в обработке:
{"id": "41B792A6-C701-4E03-AB73-59E1E21C75E3", "contactId": "91D2F68B-DC95-45CD-9A5F-65C4BAE67661", "permissionType": "EMAIL", "status": "PENDING"}
Для каждой такой строки мы берем идентификатор контакта и выполняем поиск по другой конечной точке http, которая отвечает типом содержимого application/json и сущностью, называемой "контактные данные".
- Мы берем определенное свойство из ответа JSON из запроса 2 и выполняем с ним некоторые дополнительные операции.
PS. Каждый раз, когда мы завершаем обработку всех последовательностей из интервала последовательности, мы сохраняем контрольную точку toSeq в нашем локальном хранилище данных, чтобы в случае сбоя возобновить работу с последней точки.
Как мы это делаем сейчас:
//retrieve from our database the last read sequence
Long maxSequence = entitySyncRepository.getMaxSequence();
//get the last sequence generated by the external system
Long newMaxSequence = entityApi.getMaxSequence();
//for each (from, to) tuples where maxSequence <= from < to <= newMaxSequence we are performing this method:
public void getAll(
@Nullable Long fromSeq,
@Nullable Long toSeq,
@NonNull Consumer<String> consumer
) {
URI uri = buildReplicationUri(fromSeq, toSeq);
// uri is basically https://external_host/api/contactchannels?fromSeq={fromSeq}&toSeq={fromSeq + sequenceInterval}
processMultilineStream(uri, consumer);
}
, где параметр- потребитель представляет дополнительное действие, которое мы должны выполнить в точке номер 3 (когда обрабатывается последовательность from->to). Метод ** processMultilineStream** выполняет этот связанный вызов веб-клиента:
protected void processMultilineStream(Consumer<String> lineConsumer) {
this.webClient.get()
.uri(uri)
//perform the call to the sequence URI (1)
.retrieve()
.bodyToFlux(String.class)
//map each line from request 1's response to a ContactChannel entity
.map(EntitySynchroniser::lineToContactChannelMapper)
//filter out all contact channels and leave only one that have certain characteristics
.filter(EntitySynchroniser::isPendingAndEmail)
//for each contact channel remained, get the contact data id and perform request to another endpoint in order to get the email (request number 2)
.flatMap(response1 -> getEmailWithStatusPending(response1.getContactData().getId()))
.map(EntitySynchroniser::contactDataToEmailMapper)
.doOnNext(emailAsString -> {
//for each row find, apply the consumer that does the additional processing (step 3)
lineConsumer.accept(emailAsString);
})
//wait for the incremental request to emit the last signal
.blockLast();
}
Метод getEmailWithStatusPending возвращает Mono, выполняя впоследствии функции retrieve() и bodyToMono(String.class).
Мы думаем, что, возможно, есть лучший способ написать это (возможно, мы могли бы лучше использовать возможности Flux для выполнения этих дополнительных запросов - Flux.generate или что-то в этом роде). Кто-нибудь сталкивался с таким вариантом использования (несколько встроенных действий с использованием вызовов webClient)? Любые идеи поощряются.