Управляйте одинаковыми запросами с помощью RxJava
Предположим, что у меня есть сборщик, который выбирает изображение по данной ссылке в отдельном потоке. Изображение будет затем кэшироваться в памяти. Как только изображение уже кэшируется, сборщик не будет повторно извлекать ссылку. Сборщик считается наблюдаемым. Там может быть много подписчиков, которые просят сборщик для изображения. После того, как первый подписчик подпишет сборщик, он будет снимать сеть. Однако, если на подписку приходит второй подписчик, сборщик не должен отправлять еще один запрос, пока он уже получал один. После этого, если выборка заканчивается, оба подписчика получат изображение. Прямо сейчас, если придет третий подписчик, сборщик сразу выдаст изображение.
Как я могу реализовать сценарий выше с подходом RxJava? Я ожидаю, что буду использовать какие-то существующие операторы, составлять их гораздо более декларативно и, самое главное, чтобы избежать накладных расходов на синхронизированные, блокирующие и атомарные вещи.
3 ответа
Подсказка в вопросе: "Предположим, что у меня есть средство извлечения, которое извлекает изображение из данной ссылки в отдельном потоке. Изображение будет затем кэшироваться в памяти".
И ответ cache()
оператор:
"запомните последовательность элементов, выпущенных Observable, и передайте такую же последовательность будущим подписчикам"
от: https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators
Итак, следующее Observable
следует получить изображение только один раз, независимо от того, как Subscribers
подписаться на него:
Observable<Bitmap> cachedBitmap = fetchBitmapFrom(url).cache();
РЕДАКТИРОВАТЬ:
Я думаю, что следующий пример доказывает, что вверх по течению Observable
подписывается только один раз, даже если несколько подписок приходят до Observable
испустил что-нибудь. Это также должно быть верно для сетевых запросов.
package com.example;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class SimpleCacheTest {
public static void main(String[] args) {
final Observable<Integer> cachedSomething = getSomething().cache();
System.out.println("before first subscription");
cachedSomething.subscribe(new SimpleLoggingSubscriber<Integer>("1"));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("before second subscription");
cachedSomething.subscribe(new SimpleLoggingSubscriber<Integer>("2"));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("quit");
}
private static class SimpleLoggingSubscriber<T> extends Subscriber<T> {
private final String tag;
public SimpleLoggingSubscriber(final String tag) {
this.tag = tag;
}
@Override
public void onCompleted() {
System.out.println("onCompleted (" + tag + ")");
}
@Override
public void onError(Throwable e) {
System.out.println("onError (" + tag + ")");
}
@Override
public void onNext(T t) {
System.out.println("onNext (" + tag + "): " + t);
}
}
private static Observable<Integer> getSomething() {
return Observable.create(new Observable.OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("going to sleep now...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(1);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
}
Выход:
before first subscription
going to sleep now...
before second subscription
onNext (1): 1
onNext (2): 1
onCompleted (1)
onCompleted (2)
quit
Это может быть выполнено через ConcurrentMap и AsyncSubject:
import java.awt.image.BufferedImage;
import java.io.*;
import java.net.URL;
import java.util.concurrent.*;
import javax.imageio.ImageIO;
import rx.*;
import rx.Scheduler.Worker;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
public class ObservableImageCache {
final ConcurrentMap<String, AsyncSubject<BufferedImage>> image =
new ConcurrentHashMap<>();
public Observable<BufferedImage> get(String url) {
AsyncSubject<BufferedImage> result = image.get(url);
if (result == null) {
result = AsyncSubject.create();
AsyncSubject<BufferedImage> existing = image.putIfAbsent(url, result);
if (existing == null) {
System.out.println("Debug: Downloading " + url);
AsyncSubject<BufferedImage> a = result;
Worker w = Schedulers.io().createWorker();
w.schedule(() -> {
try {
Thread.sleep(500); // for demo
URL u = new URL(url);
try (InputStream openStream = u.openStream()) {
a.onNext(ImageIO.read(openStream));
}
a.onCompleted();
} catch (IOException | InterruptedException ex) {
a.onError(ex);
} finally {
w.unsubscribe();
}
});
} else {
result = existing;
}
}
return result;
}
public static void main(String[] args) throws Exception {
ObservableImageCache cache = new ObservableImageCache();
CountDownLatch cdl = new CountDownLatch(4);
Observable<BufferedImage> img1 = cache.get("https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create.png");
System.out.println("Subscribing for IMG1");
img1.subscribe(e -> System.out.println("IMG1: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);
Thread.sleep(500);
Observable<BufferedImage> img2 = cache.get("https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create.png");
System.out.println("Subscribing for IMG2");
img2.subscribe(e -> System.out.println("IMG2: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);
Observable<BufferedImage> img3 = cache.get("https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/amb.png");
Observable<BufferedImage> img4 = cache.get("https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/amb.png");
Thread.sleep(500);
System.out.println("Subscribing for IMG3");
img3.subscribe(e -> System.out.println("IMG3: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);
Thread.sleep(1000);
System.out.println("-> Should be immediate: ");
System.out.println("Subscribing for IMG4");
img4.subscribe(e -> System.out.println("IMG4: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);
cdl.await();
}
}
Я использую putIfAbsent ConcurrentMap, чтобы убедиться, что только одна загрузка запущена для нового URL; все остальные получат тот же AsyncSubject, на котором они могут "подождать" и получить данные, как только они будут доступны, и сразу после этого. Обычно вы хотите ограничить количество одновременных загрузок с помощью пользовательского планировщика.
Посмотри на ConnectableObservable
и .replay()
метод.
В настоящее время я использую это мои фрагменты для обработки изменений ориентации:
Создание фрагмента:
ConnectableObservable<MyThing> connectableObservable =
retrofitService.fetchMyThing()
.map(...)
.replay();
connectableObservable.connect(); // this starts the actual network call
Фрагмент onCreateView:
Subscription subscription = connectableObservable
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mything -> dosomething());
То, что происходит, - то, что я делаю только 1 сетевой запрос, и любой подписчик получит (в конечном счете / немедленно) тот ответ.