Ленивый `SignalProducer`, который извлекает больше данных асинхронно, когда все данные были использованы

Давайте представим, что мы можем получать фиксированное количество сообщений асинхронно (один запрос, содержащий N элементов)

func fetchMessages(max: UInt, from: Offset) ->  SignalProducer<Message,NoError> 

Теперь я хотел бы превратить это в неограниченное SignalProducer это будет лениво fetchMessages когда предыдущий поток завершается.

func stream(from: Offset) -> SignalProducer<Message, NoError> {
    // challenge is to implement this function
}

Первоначальная идея, которая могла бы работать, но которая по-прежнему требовала бы предварительного вычисления всех диапазонов, заключалась бы в обобщении следующего кода.

func lazyFetchFrom(from: Offset) -> SignalProducer<Message,NoError> {
        return SignalProducer<Message,NoError> { (observer, disposable) in
            fetchMessages(from).start(observer)
        }
    }

    let lazyStream =
        fetchMessages(1000, from)
            .concat(lazyFetchFrom(from + 1000))
            .concat(lazyFetchFrom(from + 2000))
            .... // could probably be done generically using a flatMap

Теперь я хотел бы пойти еще дальше и оценить следующий вызов lazyFetchFrom после использования предыдущих значений. Это возможно?

Спасибо

PS: чтобы быть ясным, моя главная задача - создать какое-то противодавление, чтобы производитель не производил слишком быстро по сравнению с потребителем

Изменить: вот моя последняя попытка реализации некоторого противодавления. Однако, когда мы наблюдаем за сигналом, противодавление исчезает, и все в очереди в памяти

1 ответ

- (RACSignal *)allContentFromId:(NSInteger)contentId afterDate:(NSDate *)date fetchSize:(NSInteger)fetchSize {
    RACSignal *signalNextPagination = [self nextPaginationAllContentFromId:contentId afterDate:date fetchSize:fetchSize];

    //in signalNextPagination will be send next fetch size data and send complete only after downloaded all data
    //so we used aggregate

    return [signalNextPagination aggregateWithStart:@[] reduce:^id(NSArray *before, NSArray *next) {
        return [before arrayByAddingObjectsFromArray:next];
    }];
}


- (RACSignal *)nextPaginationAllContentFromId:(NSInteger)contentId afterDate:(NSDate *)date fetchSize:(NSInteger)fetchSize {

    //command will be send one fetch request
    //after recv data in command need try size fetch
    //if size eq fetch size, so need repeat command with new offset

    RACCommand *command = [[RACCommand alloc] initWithSignalBlock:^RACSignal *(NSDate *date) {
        return [self requestContentFromId:contentId afterDate:date limit:fetchSize];
    }];
    command.allowsConcurrentExecution = YES;

    RACSignal *download = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [[command.executionSignals flattenMap:^RACStream *(id value) {
            return value;
        }] subscribeNext:^(NSArray *datas) {
            [subscriber sendNext:datas];
            if ([datas count] == fetchSize) {
                NSDate *date = [[datas firstObject] pubDate];
                [command execute:date];
            } else {
                [subscriber sendCompleted];
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
            [subscriber sendCompleted];
        }];

        [command execute:date];
        return nil;
    }];

    return download;
}
Другие вопросы по тегам