Подписаться и отказаться от подписки на каждый отправленный элемент

Я использую lib реактивной локации.

Мой вариант использования состоит в том, что у меня есть поток объектов, излучаемых из наблюдаемой. Эти предметы могут быть выброшены потенциально каждые несколько часов. Как только элемент выпущен, я хочу получить местоположение и с помощью zipWith(насколько я понимаю) испускает объект, содержащий местоположение.

Проблема в том, что, поскольку объекты будут излучаться только раз в несколько часов, я не могу поддерживать наблюдаемое место горячим, так как это приведет к разрядке батареи.

Поэтому мне нужно следующее: как только объект будет передан в поток, подпишитесь на наблюдаемое местоположение, как только получите местоположение, отмените подписку на наблюдаемое местоположение. Это должно быть сделано постоянно.

Насколько я понимаю этот трансформатор позаботится о отписке

public <T> Observable.Transformer<T, T> takeNextAndUnsubscribe() {
    return new Observable.Transformer<T, T>() {

        @Override
        public Observable<T> call(Observable<T> tObservable) {
            final BehaviorSubject subject = BehaviorSubject.create();
            Observable source = tObservable.doOnNext(new Action1<T>() {
                @Override
                public void call(T t) {
                    subject.onNext(t);
                }
            });
            return Observable
                    .merge(source.takeUntil(subject), subject)
                    .take(1);
        }

    };
}

Но как мне подписаться снова, как только новый объект будет отправлен в поток?

1 ответ

Решение

Похоже, что вам нужно, чтобы объединить исходные элементы с текущим местоположением, когда они излучаются. Здесь не нужно ничего особенного. Просто используйте flatMap() на каждом из исходных предметов, чтобы объединить его с местоположением.

source.flatMap(item ->
        locationProvider
                .getLastKnownLocation()
                .map(location -> new ItemWithLocation<>(item, location))
);

class ItemWithLocation<T> {
    private final T item;
    private final Location location;

    public ItemWithLocation(T item, Location location) {
        this.item = item;
        this.location = location;
    }

    public T getItem() {
        return item;
    }

    public Location getLocation() {
        return location;
    }
}

РЕДАКТИРОВАТЬ: Обновлено со вторым примером. Следующие будут подписываться на обновления местоположения, пока не будет достигнута определенная точность, а затем объединить его с вашим исходным элементом. Ключевым моментом здесь является использование first(), Используя его, вы сможете отписаться от провайдера местоположения, когда вы получите местоположение, удовлетворяющее вашим потребностям.

LocationRequest request = 
        LocationRequest
            .create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);

source.flatMap(item ->
        locationProvider
                .getUpdatedLocation(request)
                .first(location -> location.getAccuracy() < 5.0f)
                .map(location -> new ItemWithLocation<>(item, location))
);
Другие вопросы по тегам