Адаптация spymemcached Java-клиента GetFuture к Guava ListenableFuture
Я ищу клиента memcached для Java, который позволяет мне выполнять асинхронное получение, предпочтительно с использованием ListenableFuture от Guava.
Один из возможных способов - использование Spymemcached. У memcachedClient есть метод asyncGet
который возвращает GetFuture
это (так же, как ListenableFuture
) является подклассом Future
, Оба класса имеют методы для добавления слушателей (увы, разных видов).
Можно ли адаптировать кэшированное с помощью spymemc GetFuture к Listenablefuture? Или уже есть какая-то библиотека Java, которая позволяет мне использовать ListenableFuture Guava с memcached?
3 ответа
Я бы просто использовал гуаву SettableFuture
(который реализует ListenableFuture
напрямую) и просто добавьте слушателя в GetFuture
что вызывает set()
на SettableFuture
,
Что касается существующей реализации, единственное, что я знаю, это закрытый источник, извините.
Нет, я действительно не нашел ответа, но я приготовил это. Кажется, это работает, но я уверен, что это может быть улучшено. Используйте на свой риск.
import com.google.common.util.concurrent.ListenableFuture;
import net.spy.memcached.internal.GetCompletionListener;
import net.spy.memcached.internal.GetFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Adapts a spymemcached GetFuture to a Guava ListenableFuture. Both are children
* of Future<V>, but have different versions of listeners for when the future is
* complete.
*
* @param <V> the type of Future.
*
*/
public class ListenableGetFuture<V> implements ListenableFuture<V>, GetCompletionListener {
private GetFuture<V> future;
private volatile List<RunnableExecutorPair> listeners = new ArrayList<>();
private final static Logger LOGGER = LoggerFactory.getLogger(ListenableGetFuture.class);
/**
* Constructor, wraps a spymemcached GetFuture<V> so it becomes a ListenableFuture<V>
* @param future The GetFuture<V>, which is the type returned from Spymemcached asyncGet.
*/
public ListenableGetFuture(GetFuture<V> future) {
this.future = future;
// Add this class as a listener to the GetFuture. This is a little confusing, as
// this class also has a method called addListener, but the thing is they are
// different kinds of listeners. When the future is completed, it calls the onComplete
// method of this class, in which we will notify the listeners (the RunnableExecutorPairs).
future.addListener(this);
}
// the ListenableFuture addListener method.
@Override
public synchronized void addListener(Runnable runnable, Executor executor) {
// if we are already done, just respond directly
if (isDone()){
executor.execute(runnable);
} else {
// otherwise add this listener and we'll get back to you when we're done
RunnableExecutorPair listener = new RunnableExecutorPair(runnable, executor);
listeners.add(listener);
}
}
// from Future<V>, pass on to the GetFuture
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}
// from Future<V>, pass on to the GetFuture
@Override
public boolean isCancelled() {
return future.isCancelled();
}
// from Future<V>, pass on to the GetFuture
@Override
public boolean isDone() {
return future.isDone();
}
// from Future<V>, pass on to the GetFuture
@Override
public V get() throws InterruptedException, ExecutionException, CancellationException {
return future.get(); // just pass exceptions
}
// from Future<V>, pass on to the GetFuture
@Override
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return future.get(timeout, unit); // just pass exceptions
}
// from CompletionListener. This means the FutureGet we are wrapping is done, so
// we call the listeners of this adapter.
@Override
public synchronized void onComplete(GetFuture<?> future) throws Exception {
for(RunnableExecutorPair listener : listeners){
listener.executor.execute(listener.runnable);
}
}
/**
* helper class for storing the listeners added in calls to the ListenableFuture
* addListener method.
*/
class RunnableExecutorPair {
Runnable runnable;
Executor executor;
RunnableExecutorPair(Runnable runnable, Executor executor) {
this.runnable = runnable;
this.executor = executor;
}
}
}
Отредактировано: сделало слушателей энергозависимыми и синхронизировало методы addListeners и onComplete, чтобы исправить прерывистые условия гонки.
Не прямой ответ, но в случае, если кто-то пытается адаптировать его в реактор Mono.
static <T> Mono<T> monoFromGetFuture(GetFuture<T> getFuture) {
return new MonoGetFuture<>(getFuture);
}
final class MonoGetFuture<T> extends Mono<T> implements Fuseable {
final GetFuture<? extends T> future;
MonoGetFuture(GetFuture<? extends T> future) {
this.future = Objects.requireNonNull(future, "future");
}
@Override
public void subscribe(Subscriber<? super T> s) {
Operators.MonoSubscriber<T, T>
sds = new Operators.MonoSubscriber<>(s);
s.onSubscribe(sds);
if (sds.isCancelled()) {
return;
}
future.addListener(future -> Try.of(future::get)
.onFailure(s::onError)
.filter(Objects::nonNull)
.onSuccess(v -> sds.complete((T) v))
.onFailure(e -> s.onComplete()));
}
}