Использование функции elapsed() на Mono?

Я пытаюсь получить время выполнения для чтения из Redis в реактивном программировании, при поиске документов я могу видеть, что elapsed() Метод будет выполнять тот же и реализованный код, как показано ниже.

Flux.fromIterable(getActions(httpHeaders))
                .parallel()
                .runOn(Schedulers.parallel())
                .flatMap(actionFact -> methodToReadFromCache(actionFact))
                .sequential();

public Mono<ActionFact> methodToReadFromCache(actionFact) {
    return Mono.fromCallable(() -> getKey(actionFact))
                .flatMap(cacheKey ->
                  redisOperations.hasKey(key)
                                .flatMap(aBoolean -> {
                                    if (aBoolean) {
                                        return redisOperations.opsForValue().get(cacheKey);
                                    }
                                    return authzService.getRolePermissions(actionFact)
                                            .flatMap(policySetResponse ->
                                                    //save in cache
                                            );
                                })
                                .elapsed()
                                .flatMap(lambda -> {
                                    LOG.info("cache/service processing key:{}, time:{}", key, lambda.getT1());
                                    return Mono.just(lambda.getT2());
                                });

Выход:

cache/service processing key:KEY1, time:3 
cache/service processing key:KEY2, time:4 
cache/service processing key:KEY3, time:18 
cache/service processing key:KEY4, time:34 
cache/service processing key:KEY5, time:46 
cache/service processing key:KEY6, time:57 
cache/service processing key:KEY7, time:70 
cache/service processing key:KEY8, time:81 
cache/service processing key:KEY9, time:91 
cache/service processing key:KEY10, time:103
cache/service processing key:KEY11, time:112
cache/service processing key:KEY12, time:121
cache/service processing key:KEY13, time:134
cache/service processing key:KEY14, time:146
cache/service processing key:KEY15, time:159

Я ожидаю, что время, необходимое для каждого запроса кеша, будет <5 миллисекунд, как первый и второй запрос, но не так. Есть ли elapsed() добавить текущее время выборки кумулятивному? Насколько я понимаю, каждый элемент, испускаемый из потока является независимым?

2 ответа

Mono#elapsed измеряет время между тем, когда Mono подписан на и момент Mono испускает предмет (onNext).

Что вызывает подписку и запуск таймера, в вашем случае это внешнее распараллеливание flatMap что вызывает methodToReadFromCache,

Что вызывает onNext и, следовательно, что приурочено является комбинацией hasKey и часть if/else (redisOperations.opsForValue().get(cacheKey) против authzService).

Внешний flatMap должен как минимум столько же таймеров, сколько есть процессоров, так как мы в параллельном режиме.

Но тот факт, что время искажено, может указывать на то, что что-то либо блокирует, либо имеет ограниченные возможности. Например, может ли быть, что redisTemplate может обрабатывать только несколько ключей одновременно?

Согласно документации

Я хочу связать выбросы со временем (Tuple2<Long, T>) измеряется...

  • с момента подписки: прошло

  • с незапамятных времен (ну, компьютерное время): отметка времени

elapsed измеряется время с момента подписки. Таким образом, вы подписываетесь, и он начинает излучать, время будет увеличиваться, чем дольше вы подписались на свой сервис.

официальные документы

Другие вопросы по тегам