Promise.cache Ratpack с несколькими последующими обещаниями в ParallelBatch
Я сталкиваюсь с NullPointerException
в кишках Ratpack при использовании Ratpack's Promise.cache
в сочетании с несколькими последующими обещаниями и ParallelBatch
, и мне не ясно из документации, является ли мое использование неправильным, или это представляет ошибку в Ratpack.
Вот сокращенный тестовый пример, который демонстрирует проблему:
@Test
public void foo() throws Exception {
List<Promise<Integer>> promises = new ArrayList<>();
for (int i = 0; i < 25; i++) {
Promise<Integer> p = Promise.value(12);
p = p.cache();
promises.add(p.map(v -> v + 1));
promises.add(p.map(v -> v + 2));
}
final List<Integer> results = ExecHarness.yieldSingle(c ->
ParallelBatch.of(promises).yield()
).getValueOrThrow();
}
Выполнение этого теста 10000 раз локально приводит к частоте отказов около 10 / 10000, с NullPointerException
это выглядит так:
java.lang.NullPointerException
at ratpack.exec.internal.CachingUpstream.yield(CachingUpstream.java:93)
at ratpack.exec.internal.CachingUpstream.tryDrain(CachingUpstream.java:65)
at ratpack.exec.internal.CachingUpstream.lambda$connect$0(CachingUpstream.java:116)
at ratpack.exec.internal.CachingUpstream$$Lambda$58/1438461739.connect(Unknown Source)
at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
at ratpack.exec.internal.DefaultExecution$$Lambda$33/2092087501.execute(Unknown Source)
at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
at ratpack.exec.internal.DefaultExecController$1$$Lambda$7/1411892748.call(Unknown Source)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory$$Lambda$8/1157058691.run(Unknown Source)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
Не используется cache
в этом тесте проблема исчезает, так как не подписывается на каждое кэшированное обещание дважды.
Мой вопрос: это неправильное использование API Ratpack, или это ошибка в фреймворке? Если первое, можете ли вы указать мне на что-то в документах, что объясняет, почему это использование неправильно?
1 ответ
Несмотря на то, что ваш пример не лучший вариант использования для обещаний кэширования (повторное создание и обещание кэширования, которое содержит одно и то же значение для каждого шага итерации, не имеет особого смысла), вы на самом деле обнаружили ошибку условия гонки в CachingUpstream<T>
учебный класс.
Я провел несколько экспериментов, чтобы выяснить, что происходит, и вот мои выводы. Во-первых, я создал обещание ценности 12
которая обеспечивает пользовательскую (более многословную) реализацию CachingUpstream<T>
объект. Я взял тело Promise.value(12)
и у меня есть переопределить встроенный метод cacheResultIf(Predicate<? super ExecResult<T>> shouldCache)
что по умолчанию возвращает CachingUpstream<T>
пример:
Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
continuation.resume(() -> down.success(12))
)) {
@Override
public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
return transform(up -> {
return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
});
}
};
Далее я создал класс TestCachingUpstream<T>
просто скопировав тело исходного класса, и я добавил несколько вещей, например,
- Я сделал каждый
TestCachingUpstream<T>
наличие внутреннего идентификатора (случайного UUID) для облегчения отслеживания выполнения обещания. - Я добавил несколько подробных сообщений журнала, когда во время выполнения обещания произошли определенные события.
Я не изменил реализацию методов, я просто хотел отследить поток выполнения и сохранить первоначальную реализацию как есть. Мой пользовательский класс выглядел так:
private static class TestCachingUpstream<T> implements Upstream<T> {
private final String id = UUID.randomUUID().toString();
private Upstream<? extends T> upstream;
private final Clock clock;
private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
private final Function<? super ExecResult<T>, Duration> ttlFunc;
private final AtomicBoolean pending = new AtomicBoolean();
private final AtomicBoolean draining = new AtomicBoolean();
private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();
public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
this(upstream, ttl, Clock.systemUTC());
}
@VisibleForTesting
TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
this.upstream = upstream;
this.ttlFunc = ttl;
this.clock = clock;
}
private void tryDrain() {
if (draining.compareAndSet(false, true)) {
try {
TestCachingUpstream.Cached<? extends T> cached = ref.get();
if (needsFetch(cached)) {
if (pending.compareAndSet(false, true)) {
Downstream<? super T> downstream = waiting.poll();
System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);
if (downstream == null) {
pending.set(false);
} else {
try {
yield(downstream);
} catch (Throwable e) {
System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
receiveResult(downstream, ExecResult.of(Result.error(e)));
}
}
}
} else {
System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
Downstream<? super T> downstream = waiting.poll();
while (downstream != null) {
downstream.accept(cached.result);
downstream = waiting.poll();
}
}
} finally {
draining.set(false);
}
}
if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
tryDrain();
}
}
private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}
private void yield(final Downstream<? super T> downstream) throws Exception {
System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
upstream.connect(new Downstream<T>() {
public void error(Throwable throwable) {
System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
}
@Override
public void success(T value) {
System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.success(value)));
}
@Override
public void complete() {
System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
receiveResult(downstream, CompleteExecResult.get());
}
});
}
@Override
public void connect(Downstream<? super T> downstream) throws Exception {
TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
if (needsFetch(cached)) {
Promise.<T>async(d -> {
waiting.add(d);
tryDrain();
}).result(downstream::accept);
} else {
downstream.accept(cached.result);
}
}
private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
Duration ttl = Duration.ofSeconds(0);
try {
ttl = ttlFunc.apply(result);
} catch (Throwable e) {
if (result.isError()) {
//noinspection ThrowableResultOfMethodCallIgnored
result.getThrowable().addSuppressed(e);
} else {
result = ExecResult.of(Result.error(e));
}
}
Instant expiresAt;
if (ttl.isNegative()) {
expiresAt = null; // eternal
System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
upstream = null; // release
} else if (ttl.isZero()) {
expiresAt = clock.instant().minus(Duration.ofSeconds(1));
} else {
expiresAt = clock.instant().plus(ttl);
}
ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
pending.set(false);
downstream.accept(result);
tryDrain();
}
static class Cached<T> {
final ExecResult<T> result;
final Instant expireAt;
Cached(ExecResult<T> result, Instant expireAt) {
this.result = result;
this.expireAt = expireAt;
}
}
}
Я сократил количество шагов в цикле for с 25 до 3, чтобы вывод на консоль был более кратким.
Успешное выполнение теста (без условий гонки)
Давайте посмотрим, как выглядит поток правильного выполнения:
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] no pending execution and downstream is not null and cached is null...
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] calling yield...
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream.connect.success
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream does not need fetching...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] no pending execution and downstream is not null and cached is null...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] calling yield...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream.connect.success
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream does not need fetching...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] no pending execution and downstream is not null and cached is null...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] calling yield...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream.connect.success
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream does not need fetching...
Как видите, каждая итерация приводит к тому, что кэшированное обещание выдает 5 строк журнала консоли.
- когда
tryDrain
метод вызывается в первый раз, результат не кэшируется, и он возвращается кyield(downstream);
вызов метода - призвание
yield(downstream)
завершается успешно иreceiveResult(downstream, ExecResult.of(Result.success(value)));
вызывается изнутриsuccess
Перезвоните Promise.cache()
использует бесконечную дату истечения срока действия с использованием отрицательной продолжительности, и именно поэтомуreceiveResult()
метод релизыupstream
объект, установив его значениеnull
receiveResult()
метод до завершения устанавливает кэшированный результат, используяref
внутренний объект и звонкиtryDrain()
прямо перед выходом из метода.tryDrain()
метод видит ранее кэшированный результат для следующего вызова в кэшированном обещании (p.map(v -> v + 2)
) поэтому он передает кешированный результат непосредственно в нисходящий поток.
И этот сценарий повторяется для всех трех обещаний, созданных внутри цикла for.
Неудачное выполнение теста (состояние гонки)
Запуск теста с теми System.out.printf()
проваливалось несколько раз реже тестирование, в основном потому, что эта операция ввода-вывода потребляет несколько циклов ЦП, а десинхронизированная часть кода имеет еще несколько циклов, чтобы избежать состояния гонки. Однако это все еще происходит, и теперь давайте посмотрим, как выглядит результат неудачного теста:
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] calling yield...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] upstream.connect.success
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling yield... upstream is null...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] no pending execution and downstream is not null and cached is null...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] calling yield...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream.connect.success
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream does not need fetching...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] no pending execution and downstream is not null and cached is null...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] calling yield...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream.connect.success
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream does not need fetching...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling receiveResult after catching exception class java.lang.NullPointerException
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] releasing upstream... (ExecResult{complete=false, error=java.lang.NullPointerException, value=null})
java.lang.NullPointerException
at app.AnotherPromiseTest$TestCachingUpstream.yield(AnotherPromiseTest.java:120)
at app.AnotherPromiseTest$TestCachingUpstream.tryDrain(AnotherPromiseTest.java:89)
at app.AnotherPromiseTest$TestCachingUpstream.lambda$connect$0(AnotherPromiseTest.java:146)
at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Это результат неудачного теста - я запускаю его в IntelliJ IDEA и настроил выполнение этого теста так, чтобы он повторялся до отказа. Мне потребовалось некоторое время, чтобы этот тест не удался, но после того, как он несколько раз выполнялся, он, наконец, не прошел итерацию с номером 1500. В этом случае мы можем видеть, что условие гонки произошло с первым обещанием, созданным в цикле for. Вы можете увидеть это после освобождения объекта вверх по течению внутри receiveResult()
метод
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
и звонит tryDrain
перед выходом из метода следующее выполнение кэшированного обещания еще не увидело ранее кэшированного результата, и оно дошло до yield(downstream)
метод снова. После upstream
объект уже был освобожден, установив его значение null
, А также yield(downstream)
Метод ожидает, что вышестоящий объект инициализирован правильно, в противном случае он выбрасывает NPE.
Я пытался отладить метод:
private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}
Это метод, который решает, нужно ли извлекать кэшированное обещание. Однако, когда я добавил какие-либо записи журнала, это начало вызывать StackruError
, Я предполагаю, что в редких случаях cached.expireAt.isBefore(clock.instant())
возвращается false
, так как cached
объект исходит от AtomicReference
поэтому этот объект должен быть правильно передан между выполнением методов.
И вот полный тестовый класс, который я использовал в своих экспериментах:
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.internal.PlatformDependent;
import org.junit.Test;
import ratpack.exec.*;
import ratpack.exec.internal.CompleteExecResult;
import ratpack.exec.internal.DefaultExecution;
import ratpack.exec.internal.DefaultPromise;
import ratpack.exec.util.ParallelBatch;
import ratpack.func.Function;
import ratpack.func.Predicate;
import ratpack.test.exec.ExecHarness;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class AnotherPromiseTest {
@Test
public void foo() throws Exception {
List<Promise<Integer>> promises = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
continuation.resume(() -> down.success(12))
)) {
@Override
public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
return transform(up -> {
return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
});
}
};
p = p.cache();
promises.add(p.map(v -> v + 1));
promises.add(p.map(v -> v + 2));
}
ExecHarness.yieldSingle(c -> ParallelBatch.of(promises).yield()).getValueOrThrow();
}
private static class TestCachingUpstream<T> implements Upstream<T> {
private final String id = UUID.randomUUID().toString();
private Upstream<? extends T> upstream;
private final Clock clock;
private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
private final Function<? super ExecResult<T>, Duration> ttlFunc;
private final AtomicBoolean pending = new AtomicBoolean();
private final AtomicBoolean draining = new AtomicBoolean();
private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();
public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
this(upstream, ttl, Clock.systemUTC());
}
@VisibleForTesting
TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
this.upstream = upstream;
this.ttlFunc = ttl;
this.clock = clock;
}
private void tryDrain() {
if (draining.compareAndSet(false, true)) {
try {
TestCachingUpstream.Cached<? extends T> cached = ref.get();
if (needsFetch(cached)) {
if (pending.compareAndSet(false, true)) {
Downstream<? super T> downstream = waiting.poll();
System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);
if (downstream == null) {
pending.set(false);
} else {
try {
yield(downstream);
} catch (Throwable e) {
System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
receiveResult(downstream, ExecResult.of(Result.error(e)));
}
}
}
} else {
System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
Downstream<? super T> downstream = waiting.poll();
while (downstream != null) {
downstream.accept(cached.result);
downstream = waiting.poll();
}
}
} finally {
draining.set(false);
}
}
if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
tryDrain();
}
}
private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}
private void yield(final Downstream<? super T> downstream) throws Exception {
System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
upstream.connect(new Downstream<T>() {
public void error(Throwable throwable) {
System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
}
@Override
public void success(T value) {
System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.success(value)));
}
@Override
public void complete() {
System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
receiveResult(downstream, CompleteExecResult.get());
}
});
}
@Override
public void connect(Downstream<? super T> downstream) throws Exception {
TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
if (needsFetch(cached)) {
Promise.<T>async(d -> {
waiting.add(d);
tryDrain();
}).result(downstream::accept);
} else {
downstream.accept(cached.result);
}
}
private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
Duration ttl = Duration.ofSeconds(0);
try {
ttl = ttlFunc.apply(result);
} catch (Throwable e) {
if (result.isError()) {
//noinspection ThrowableResultOfMethodCallIgnored
result.getThrowable().addSuppressed(e);
} else {
result = ExecResult.of(Result.error(e));
}
}
Instant expiresAt;
if (ttl.isNegative()) {
expiresAt = null; // eternal
System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
upstream = null; // release
} else if (ttl.isZero()) {
expiresAt = clock.instant().minus(Duration.ofSeconds(1));
} else {
expiresAt = clock.instant().plus(ttl);
}
ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
pending.set(false);
downstream.accept(result);
tryDrain();
}
static class Cached<T> {
final ExecResult<T> result;
final Instant expireAt;
Cached(ExecResult<T> result, Instant expireAt) {
this.result = result;
this.expireAt = expireAt;
}
}
}
}
Надеюсь, поможет.