Promise.cache Ratpack с несколькими последующими обещаниями в ParallelBatch

Я сталкиваюсь с NullPointerException в кишках Ratpack при использовании Ratpack's Promise.cache в сочетании с несколькими последующими обещаниями и ParallelBatch, и мне не ясно из документации, является ли мое использование неправильным, или это представляет ошибку в Ratpack.

Вот сокращенный тестовый пример, который демонстрирует проблему:

@Test
public void foo() throws Exception {
    List<Promise<Integer>> promises = new ArrayList<>();

    for (int i = 0; i < 25; i++) {
        Promise<Integer> p = Promise.value(12);
        p = p.cache();
        promises.add(p.map(v -> v + 1));
        promises.add(p.map(v -> v + 2));
    }

    final List<Integer> results = ExecHarness.yieldSingle(c ->
            ParallelBatch.of(promises).yield()
    ).getValueOrThrow();
}

Выполнение этого теста 10000 раз локально приводит к частоте отказов около 10 / 10000, с NullPointerException это выглядит так:

java.lang.NullPointerException
    at ratpack.exec.internal.CachingUpstream.yield(CachingUpstream.java:93)
    at ratpack.exec.internal.CachingUpstream.tryDrain(CachingUpstream.java:65)
    at ratpack.exec.internal.CachingUpstream.lambda$connect$0(CachingUpstream.java:116)
    at ratpack.exec.internal.CachingUpstream$$Lambda$58/1438461739.connect(Unknown Source)
    at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
    at ratpack.exec.internal.DefaultExecution$$Lambda$33/2092087501.execute(Unknown Source)
    at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
    at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
    at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
    at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
    at ratpack.exec.internal.DefaultExecController$1$$Lambda$7/1411892748.call(Unknown Source)
    at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory$$Lambda$8/1157058691.run(Unknown Source)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:745)

Не используется cache в этом тесте проблема исчезает, так как не подписывается на каждое кэшированное обещание дважды.

Мой вопрос: это неправильное использование API Ratpack, или это ошибка в фреймворке? Если первое, можете ли вы указать мне на что-то в документах, что объясняет, почему это использование неправильно?

1 ответ

Решение

Несмотря на то, что ваш пример не лучший вариант использования для обещаний кэширования (повторное создание и обещание кэширования, которое содержит одно и то же значение для каждого шага итерации, не имеет особого смысла), вы на самом деле обнаружили ошибку условия гонки в CachingUpstream<T> учебный класс.

Я провел несколько экспериментов, чтобы выяснить, что происходит, и вот мои выводы. Во-первых, я создал обещание ценности 12 которая обеспечивает пользовательскую (более многословную) реализацию CachingUpstream<T> объект. Я взял тело Promise.value(12) и у меня есть переопределить встроенный метод cacheResultIf(Predicate<? super ExecResult<T>> shouldCache) что по умолчанию возвращает CachingUpstream<T> пример:

Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
        continuation.resume(() -> down.success(12))
)) {
    @Override
    public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
        return transform(up -> {
            return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
        });
    }
};

Далее я создал класс TestCachingUpstream<T> просто скопировав тело исходного класса, и я добавил несколько вещей, например,

  • Я сделал каждый TestCachingUpstream<T> наличие внутреннего идентификатора (случайного UUID) для облегчения отслеживания выполнения обещания.
  • Я добавил несколько подробных сообщений журнала, когда во время выполнения обещания произошли определенные события.

Я не изменил реализацию методов, я просто хотел отследить поток выполнения и сохранить первоначальную реализацию как есть. Мой пользовательский класс выглядел так:

private static class TestCachingUpstream<T> implements Upstream<T> {
    private final String id = UUID.randomUUID().toString();

    private Upstream<? extends T> upstream;

    private final Clock clock;
    private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
    private final Function<? super ExecResult<T>, Duration> ttlFunc;

    private final AtomicBoolean pending = new AtomicBoolean();
    private final AtomicBoolean draining = new AtomicBoolean();
    private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();

    public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
        this(upstream, ttl, Clock.systemUTC());
    }

    @VisibleForTesting
    TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
        this.upstream = upstream;
        this.ttlFunc = ttl;
        this.clock = clock;
    }

    private void tryDrain() {
        if (draining.compareAndSet(false, true)) {
            try {
                TestCachingUpstream.Cached<? extends T> cached = ref.get();
                if (needsFetch(cached)) {
                    if (pending.compareAndSet(false, true)) {
                        Downstream<? super T> downstream = waiting.poll();

                        System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);

                        if (downstream == null) {
                            pending.set(false);
                        } else {
                            try {
                                yield(downstream);
                            } catch (Throwable e) {
                                System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
                                receiveResult(downstream, ExecResult.of(Result.error(e)));
                            }
                        }
                    }
                } else {
                    System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
                    Downstream<? super T> downstream = waiting.poll();
                    while (downstream != null) {
                        downstream.accept(cached.result);
                        downstream = waiting.poll();
                    }
                }
            } finally {
                draining.set(false);
            }
        }

        if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
            tryDrain();
        }
    }

    private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
        return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
    }

    private void yield(final Downstream<? super T> downstream) throws Exception {
        System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
        upstream.connect(new Downstream<T>() {
            public void error(Throwable throwable) {
                System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
                receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
            }

            @Override
            public void success(T value) {
                System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
                receiveResult(downstream, ExecResult.of(Result.success(value)));
            }

            @Override
            public void complete() {
                System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
                receiveResult(downstream, CompleteExecResult.get());
            }
        });
    }

    @Override
    public void connect(Downstream<? super T> downstream) throws Exception {
        TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
        if (needsFetch(cached)) {
            Promise.<T>async(d -> {
                waiting.add(d);
                tryDrain();
            }).result(downstream::accept);
        } else {
            downstream.accept(cached.result);
        }
    }

    private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
        Duration ttl = Duration.ofSeconds(0);
        try {
            ttl = ttlFunc.apply(result);
        } catch (Throwable e) {
            if (result.isError()) {
                //noinspection ThrowableResultOfMethodCallIgnored
                result.getThrowable().addSuppressed(e);
            } else {
                result = ExecResult.of(Result.error(e));
            }
        }

        Instant expiresAt;
        if (ttl.isNegative()) {
            expiresAt = null; // eternal
            System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
            upstream = null; // release
        } else if (ttl.isZero()) {
            expiresAt = clock.instant().minus(Duration.ofSeconds(1));
        } else {
            expiresAt = clock.instant().plus(ttl);
        }

        ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
        pending.set(false);

        downstream.accept(result);

        tryDrain();
    }

    static class Cached<T> {
        final ExecResult<T> result;
        final Instant expireAt;

        Cached(ExecResult<T> result, Instant expireAt) {
            this.result = result;
            this.expireAt = expireAt;
        }
    }
}

Я сократил количество шагов в цикле for с 25 до 3, чтобы вывод на консоль был более кратким.

Успешное выполнение теста (без условий гонки)

Давайте посмотрим, как выглядит поток правильного выполнения:

[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] no pending execution and downstream is not null and cached is null...
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] calling yield...  
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream.connect.success
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream does not need fetching...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] no pending execution and downstream is not null and cached is null...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] calling yield...  
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream.connect.success
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream does not need fetching...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] no pending execution and downstream is not null and cached is null...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] calling yield...  
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream.connect.success
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream does not need fetching...

Как видите, каждая итерация приводит к тому, что кэшированное обещание выдает 5 строк журнала консоли.

  • когда tryDrain метод вызывается в первый раз, результат не кэшируется, и он возвращается к yield(downstream); вызов метода
  • призвание yield(downstream) завершается успешно и receiveResult(downstream, ExecResult.of(Result.success(value))); вызывается изнутри success Перезвоните
  • Promise.cache() использует бесконечную дату истечения срока действия с использованием отрицательной продолжительности, и именно поэтому receiveResult() метод релизы upstream объект, установив его значение null
  • receiveResult() метод до завершения устанавливает кэшированный результат, используя ref внутренний объект и звонки tryDrain() прямо перед выходом из метода.
  • tryDrain() метод видит ранее кэшированный результат для следующего вызова в кэшированном обещании (p.map(v -> v + 2)) поэтому он передает кешированный результат непосредственно в нисходящий поток.

И этот сценарий повторяется для всех трех обещаний, созданных внутри цикла for.

Неудачное выполнение теста (состояние гонки)

Запуск теста с теми System.out.printf() проваливалось несколько раз реже тестирование, в основном потому, что эта операция ввода-вывода потребляет несколько циклов ЦП, а десинхронизированная часть кода имеет еще несколько циклов, чтобы избежать состояния гонки. Однако это все еще происходит, и теперь давайте посмотрим, как выглядит результат неудачного теста:

[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] calling yield...  
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] upstream.connect.success
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling yield... upstream is null... 
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] no pending execution and downstream is not null and cached is null...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] calling yield...  
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream.connect.success
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream does not need fetching...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] no pending execution and downstream is not null and cached is null...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] calling yield...  
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream.connect.success
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream does not need fetching...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling receiveResult after catching exception class java.lang.NullPointerException
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] releasing upstream... (ExecResult{complete=false, error=java.lang.NullPointerException, value=null}) 

java.lang.NullPointerException
    at app.AnotherPromiseTest$TestCachingUpstream.yield(AnotherPromiseTest.java:120)
    at app.AnotherPromiseTest$TestCachingUpstream.tryDrain(AnotherPromiseTest.java:89)
    at app.AnotherPromiseTest$TestCachingUpstream.lambda$connect$0(AnotherPromiseTest.java:146)
    at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
    at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
    at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
    at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
    at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
    at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

Это результат неудачного теста - я запускаю его в IntelliJ IDEA и настроил выполнение этого теста так, чтобы он повторялся до отказа. Мне потребовалось некоторое время, чтобы этот тест не удался, но после того, как он несколько раз выполнялся, он, наконец, не прошел итерацию с номером 1500. В этом случае мы можем видеть, что условие гонки произошло с первым обещанием, созданным в цикле for. Вы можете увидеть это после освобождения объекта вверх по течению внутри receiveResult() метод

[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 

и звонит tryDrain перед выходом из метода следующее выполнение кэшированного обещания еще не увидело ранее кэшированного результата, и оно дошло до yield(downstream) метод снова. После upstream объект уже был освобожден, установив его значение null, А также yield(downstream) Метод ожидает, что вышестоящий объект инициализирован правильно, в противном случае он выбрасывает NPE.

Я пытался отладить метод:

private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
    return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}

Это метод, который решает, нужно ли извлекать кэшированное обещание. Однако, когда я добавил какие-либо записи журнала, это начало вызывать StackruError, Я предполагаю, что в редких случаях cached.expireAt.isBefore(clock.instant()) возвращается false, так как cached объект исходит от AtomicReference поэтому этот объект должен быть правильно передан между выполнением методов.

И вот полный тестовый класс, который я использовал в своих экспериментах:

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.internal.PlatformDependent;
import org.junit.Test;
import ratpack.exec.*;
import ratpack.exec.internal.CompleteExecResult;
import ratpack.exec.internal.DefaultExecution;
import ratpack.exec.internal.DefaultPromise;
import ratpack.exec.util.ParallelBatch;
import ratpack.func.Function;
import ratpack.func.Predicate;
import ratpack.test.exec.ExecHarness;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class AnotherPromiseTest {

    @Test
    public void foo() throws Exception {
        List<Promise<Integer>> promises = new ArrayList<>();

        for (int i = 0; i < 3; i++) {
            Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
                    continuation.resume(() -> down.success(12))
            )) {
                @Override
                public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
                    return transform(up -> {
                        return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
                    });
                }
            };

            p = p.cache();
            promises.add(p.map(v -> v + 1));
            promises.add(p.map(v -> v + 2));
        }

        ExecHarness.yieldSingle(c -> ParallelBatch.of(promises).yield()).getValueOrThrow();
    }

    private static class TestCachingUpstream<T> implements Upstream<T> {
        private final String id = UUID.randomUUID().toString();

        private Upstream<? extends T> upstream;

        private final Clock clock;
        private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
        private final Function<? super ExecResult<T>, Duration> ttlFunc;

        private final AtomicBoolean pending = new AtomicBoolean();
        private final AtomicBoolean draining = new AtomicBoolean();
        private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();

        public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
            this(upstream, ttl, Clock.systemUTC());
        }

        @VisibleForTesting
        TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
            this.upstream = upstream;
            this.ttlFunc = ttl;
            this.clock = clock;
        }

        private void tryDrain() {
            if (draining.compareAndSet(false, true)) {
                try {
                    TestCachingUpstream.Cached<? extends T> cached = ref.get();
                    if (needsFetch(cached)) {
                        if (pending.compareAndSet(false, true)) {
                            Downstream<? super T> downstream = waiting.poll();

                            System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);

                            if (downstream == null) {
                                pending.set(false);
                            } else {
                                try {
                                    yield(downstream);
                                } catch (Throwable e) {
                                    System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
                                    receiveResult(downstream, ExecResult.of(Result.error(e)));
                                }
                            }
                        }
                    } else {
                        System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
                        Downstream<? super T> downstream = waiting.poll();
                        while (downstream != null) {
                            downstream.accept(cached.result);
                            downstream = waiting.poll();
                        }
                    }
                } finally {
                    draining.set(false);
                }
            }

            if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
                tryDrain();
            }
        }

        private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
            return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
        }

        private void yield(final Downstream<? super T> downstream) throws Exception {
            System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
            upstream.connect(new Downstream<T>() {
                public void error(Throwable throwable) {
                    System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
                }

                @Override
                public void success(T value) {
                    System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, ExecResult.of(Result.success(value)));
                }

                @Override
                public void complete() {
                    System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, CompleteExecResult.get());
                }
            });
        }

        @Override
        public void connect(Downstream<? super T> downstream) throws Exception {
            TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
            if (needsFetch(cached)) {
                Promise.<T>async(d -> {
                    waiting.add(d);
                    tryDrain();
                }).result(downstream::accept);
            } else {
                downstream.accept(cached.result);
            }
        }

        private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
            Duration ttl = Duration.ofSeconds(0);
            try {
                ttl = ttlFunc.apply(result);
            } catch (Throwable e) {
                if (result.isError()) {
                    //noinspection ThrowableResultOfMethodCallIgnored
                    result.getThrowable().addSuppressed(e);
                } else {
                    result = ExecResult.of(Result.error(e));
                }
            }

            Instant expiresAt;
            if (ttl.isNegative()) {
                expiresAt = null; // eternal
                System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
                upstream = null; // release
            } else if (ttl.isZero()) {
                expiresAt = clock.instant().minus(Duration.ofSeconds(1));
            } else {
                expiresAt = clock.instant().plus(ttl);
            }

            ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
            pending.set(false);

            downstream.accept(result);

            tryDrain();
        }

        static class Cached<T> {
            final ExecResult<T> result;
            final Instant expireAt;

            Cached(ExecResult<T> result, Instant expireAt) {
                this.result = result;
                this.expireAt = expireAt;
            }
        }
    }
}

Надеюсь, поможет.

Другие вопросы по тегам