Ленивый `SignalProducer`, который извлекает больше данных асинхронно, когда все данные были использованы
Давайте представим, что мы можем получать фиксированное количество сообщений асинхронно (один запрос, содержащий N элементов)
func fetchMessages(max: UInt, from: Offset) -> SignalProducer<Message,NoError>
Теперь я хотел бы превратить это в неограниченное SignalProducer
это будет лениво fetchMessages
когда предыдущий поток завершается.
func stream(from: Offset) -> SignalProducer<Message, NoError> {
// challenge is to implement this function
}
Первоначальная идея, которая могла бы работать, но которая по-прежнему требовала бы предварительного вычисления всех диапазонов, заключалась бы в обобщении следующего кода.
func lazyFetchFrom(from: Offset) -> SignalProducer<Message,NoError> {
return SignalProducer<Message,NoError> { (observer, disposable) in
fetchMessages(from).start(observer)
}
}
let lazyStream =
fetchMessages(1000, from)
.concat(lazyFetchFrom(from + 1000))
.concat(lazyFetchFrom(from + 2000))
.... // could probably be done generically using a flatMap
Теперь я хотел бы пойти еще дальше и оценить следующий вызов lazyFetchFrom после использования предыдущих значений. Это возможно?
Спасибо
PS: чтобы быть ясным, моя главная задача - создать какое-то противодавление, чтобы производитель не производил слишком быстро по сравнению с потребителем
Изменить: вот моя последняя попытка реализации некоторого противодавления. Однако, когда мы наблюдаем за сигналом, противодавление исчезает, и все в очереди в памяти
1 ответ
- (RACSignal *)allContentFromId:(NSInteger)contentId afterDate:(NSDate *)date fetchSize:(NSInteger)fetchSize {
RACSignal *signalNextPagination = [self nextPaginationAllContentFromId:contentId afterDate:date fetchSize:fetchSize];
//in signalNextPagination will be send next fetch size data and send complete only after downloaded all data
//so we used aggregate
return [signalNextPagination aggregateWithStart:@[] reduce:^id(NSArray *before, NSArray *next) {
return [before arrayByAddingObjectsFromArray:next];
}];
}
- (RACSignal *)nextPaginationAllContentFromId:(NSInteger)contentId afterDate:(NSDate *)date fetchSize:(NSInteger)fetchSize {
//command will be send one fetch request
//after recv data in command need try size fetch
//if size eq fetch size, so need repeat command with new offset
RACCommand *command = [[RACCommand alloc] initWithSignalBlock:^RACSignal *(NSDate *date) {
return [self requestContentFromId:contentId afterDate:date limit:fetchSize];
}];
command.allowsConcurrentExecution = YES;
RACSignal *download = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[[command.executionSignals flattenMap:^RACStream *(id value) {
return value;
}] subscribeNext:^(NSArray *datas) {
[subscriber sendNext:datas];
if ([datas count] == fetchSize) {
NSDate *date = [[datas firstObject] pubDate];
[command execute:date];
} else {
[subscriber sendCompleted];
}
} error:^(NSError *error) {
[subscriber sendError:error];
[subscriber sendCompleted];
}];
[command execute:date];
return nil;
}];
return download;
}