Адаптация spymemcached Java-клиента GetFuture к Guava ListenableFuture

Я ищу клиента memcached для Java, который позволяет мне выполнять асинхронное получение, предпочтительно с использованием ListenableFuture от Guava.

Один из возможных способов - использование Spymemcached. У memcachedClient есть метод asyncGet который возвращает GetFuture это (так же, как ListenableFuture) является подклассом Future, Оба класса имеют методы для добавления слушателей (увы, разных видов).

Можно ли адаптировать кэшированное с помощью spymemc GetFuture к Listenablefuture? Или уже есть какая-то библиотека Java, которая позволяет мне использовать ListenableFuture Guava с memcached?

3 ответа

Я бы просто использовал гуаву SettableFuture (который реализует ListenableFuture напрямую) и просто добавьте слушателя в GetFuture что вызывает set() на SettableFuture,

Что касается существующей реализации, единственное, что я знаю, это закрытый источник, извините.

Нет, я действительно не нашел ответа, но я приготовил это. Кажется, это работает, но я уверен, что это может быть улучшено. Используйте на свой риск.

import com.google.common.util.concurrent.ListenableFuture;

import net.spy.memcached.internal.GetCompletionListener;
import net.spy.memcached.internal.GetFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * Adapts a spymemcached GetFuture to a Guava ListenableFuture. Both are children
 * of Future<V>, but have different versions of listeners for when the future is
 * complete.
 *
 * @param <V> the type of Future.
 *
 */
public class ListenableGetFuture<V> implements ListenableFuture<V>, GetCompletionListener {

    private GetFuture<V> future;

    private volatile List<RunnableExecutorPair> listeners = new ArrayList<>();

    private final static Logger LOGGER = LoggerFactory.getLogger(ListenableGetFuture.class);

    /**
     * Constructor, wraps a spymemcached GetFuture<V> so it becomes a ListenableFuture<V>
     * @param future  The GetFuture<V>, which is the type returned from Spymemcached asyncGet.
     */
    public ListenableGetFuture(GetFuture<V> future) {
        this.future = future;

        // Add this class as a listener to the GetFuture. This is a little confusing, as
        // this class also has a method called addListener, but the thing is they are
        // different kinds of listeners. When the future is completed, it calls the onComplete
        // method of this class, in which we will notify the listeners (the RunnableExecutorPairs).
        future.addListener(this);
    }

    // the ListenableFuture addListener method.
    @Override
    public synchronized void addListener(Runnable runnable, Executor executor) {

        // if we are already done, just respond directly
        if (isDone()){
            executor.execute(runnable);
        } else {
            // otherwise add this listener and we'll get back to you when we're done
            RunnableExecutorPair listener = new RunnableExecutorPair(runnable, executor);
            listeners.add(listener);
        }
    }

    // from Future<V>, pass on to the GetFuture
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return future.cancel(mayInterruptIfRunning);
    }

    // from Future<V>, pass on to the GetFuture
    @Override
    public boolean isCancelled() {
        return future.isCancelled();
    }

    // from Future<V>, pass on to the GetFuture
    @Override
    public boolean isDone() {
        return future.isDone();
    }

    // from Future<V>, pass on to the GetFuture
    @Override
    public V get() throws InterruptedException, ExecutionException, CancellationException {
      return future.get(); // just pass exceptions
    }

    // from Future<V>, pass on to the GetFuture
    @Override
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
          return future.get(timeout, unit); // just pass exceptions
    }

    // from CompletionListener. This means the FutureGet we are wrapping is done, so
    // we call the listeners of this adapter.
    @Override
    public synchronized void onComplete(GetFuture<?> future) throws Exception {
        for(RunnableExecutorPair listener : listeners){
            listener.executor.execute(listener.runnable);
        }
    }

    /**
     * helper class for storing the listeners added in calls to the ListenableFuture
     * addListener method.
     */
    class RunnableExecutorPair {
        Runnable runnable;
        Executor executor;

        RunnableExecutorPair(Runnable runnable, Executor executor) {
            this.runnable = runnable;
            this.executor = executor;
        }
    }
}

Отредактировано: сделало слушателей энергозависимыми и синхронизировало методы addListeners и onComplete, чтобы исправить прерывистые условия гонки.

Не прямой ответ, но в случае, если кто-то пытается адаптировать его в реактор Mono.

static <T> Mono<T> monoFromGetFuture(GetFuture<T> getFuture) {
    return new MonoGetFuture<>(getFuture);
}

final class MonoGetFuture<T> extends Mono<T> implements Fuseable {

    final GetFuture<? extends T> future;

    MonoGetFuture(GetFuture<? extends T> future) {
        this.future = Objects.requireNonNull(future, "future");
    }

    @Override
    public void subscribe(Subscriber<? super T> s) {

        Operators.MonoSubscriber<T, T>
                sds = new Operators.MonoSubscriber<>(s);

        s.onSubscribe(sds);

        if (sds.isCancelled()) {
            return;
        }

        future.addListener(future -> Try.of(future::get)
                                        .onFailure(s::onError)
                                        .filter(Objects::nonNull)
                                        .onSuccess(v -> sds.complete((T) v))
                                        .onFailure(e -> s.onComplete()));
    }
}
Другие вопросы по тегам