Observable.Generate в RxJava?

В.NET Reactive Extensions есть удобный метод генерации последовательностей с использованием corecursion, который называется Observable.Generate.

Есть ли в RxJava метод аналогов, позволяющий генерировать данные с помощью corecursion? Если нет, то может ли это быть реализовано на основе существующих методов?

1 ответ

Решение

Это не точное совпадение, но мы имеем SyncOnSubscribe (а также AsyncOnSubscriber) которые могут генерировать значения, например:

@Test
public void testRange() {
    final int start = 1;
    final int count = 4000;
    OnSubscribe<Integer> os = SyncOnSubscribe.createStateful(new Func0<Integer>(){
        @Override
        public Integer call() {
            return start;
        }}, 
        new Func2<Integer, Observer<? super Integer>, Integer>() {
            @Override
            public Integer call(Integer state, Observer<? super Integer> subscriber) {
                subscriber.onNext(state);
                if (state == count) {
                    subscriber.onCompleted();
                }
                return state + 1;
            }
        });

    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);
    InOrder inOrder = inOrder(o);

    Observable.create(os).subscribe(o);

    verify(o, never()).onError(any(TestException.class));
    inOrder.verify(o, times(count)).onNext(any(Integer.class));
    inOrder.verify(o).onCompleted();
    inOrder.verifyNoMoreInteractions();
}
Другие вопросы по тегам