Swift Combine: Как создать одного издателя из списка издателей?

Используя новую платформу Combine от Apple, я хочу сделать несколько запросов от каждого элемента в списке. Тогда я хочу получить один результат от сокращения всех ответов. В основном я хочу перейти от списка издателей к одному издателю, который содержит список ответов.

Я пытался составить список издателей, но я не знаю, как сократить этот список до одного издателя. И я попытался создать издателя, содержащего список, но я не могу отобразить список издателей.

Пожалуйста, посмотрите на функцию "createIngredients"

    func createIngredient(ingredient: Ingredient) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
        return apollo.performPub(mutation: CreateIngredientMutation(name: ingredient.name, optionalProduct: ingredient.productId, quantity: ingredient.quantity, unit: ingredient.unit))
        .eraseToAnyPublisher()
    }

    func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
        // first attempt
        let results = ingredients
            .map(createIngredient)
        // results = [AnyPublisher<CreateIngredientMutation.Data, Error>]

        // second attempt
        return Publishers.Just(ingredients)
            .eraseToAnyPublisher()
            .flatMap { (list: [Ingredient]) -> Publisher<[CreateIngredientMutation.Data], Error> in
                return list.map(createIngredient) // [AnyPublisher<CreateIngredientMutation.Data, Error>]
        }
    }

Я не уверен, как взять массив издателей и преобразовать его в издателя, содержащего массив.

Значение результата типа "[AnyPublisher]" не соответствует типу результата закрытия "Издатель"

5 ответов

Решение

По сути, в вашей конкретной ситуации вы смотрите на что-то вроде этого:

    func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
        let publisherOfPublishers = Publishers.Sequence<[AnyPublisher<CreateIngredientMutation.Data, Error>], Error>(sequence: ingredients.map(createIngredient))
        return publisherOfPublishers.flatMap { $0 }.collect(ingredients.count).eraseToAnyPublisher()
    }

Это зависит от того, что каждый из ваших внутренних издателей всегда дает ровно один результат - так что об этом нужно знать.

Более общий ответ, с помощью которого вы можете протестировать его с помощью среды EntwineTest:

import XCTest
import Combine
import EntwineTest

final class MyTests: XCTestCase {

    func testCreateArrayFromArrayOfPublishers() {

        typealias SimplePublisher = Publishers.Just<Int>

        // we'll create our 'list of publishers' here
        let publishers: [SimplePublisher] = [
            .init(1),
            .init(2),
            .init(3),
        ]

        // we'll turn our publishers into a sequence of
        // publishers, a publisher of publishers if you will
        let publisherOfPublishers = Publishers.Sequence<[SimplePublisher], Never>(sequence: publishers)

        // we flatten our publisher of publishers into a single merged stream
        // via `flatMap` then we `collect` exactly three results (we know we're
        // waiting for as many results as we have original publishers), then
        // return the resulting publisher
        let finalPublisher = publisherOfPublishers.flatMap{ $0 }.collect(publishers.count)

        // Let's test what we expect to happen, will happen.
        // We'll create a scheduler to run our test on
        let testScheduler = TestScheduler()

        // Then we'll start a test. Our test will subscribe to our publisher
        // at a virtual time of 200, and cancel the subscription at 900
        let testableSubscriber = testScheduler.start { finalPublisher }

        // we're expecting that, immediately upon subscription, our results will
        // arrive. This is because we're using `just` type publishers which
        // dispatch their contents as soon as they're subscribed to
        XCTAssertEqual(testableSubscriber.sequence, [
            (200, .subscription),            // we're expecting to subscribe at 200
            (200, .input([1, 2, 3])),        // then receive an array of results immediately
            (200, .completion(.finished)),   // the `collect` operator finishes immediately after completion
        ])
    }
}

Я думаю что Publishers.MergeManyздесь может помочь. В вашем примере вы можете использовать это так:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
    let publishers = ingredients.map(createIngredient(ingredient:))
    return Publishers.MergeMany(publishers).eraseToAnyPublisher()
}

Это даст вам издателя, который отправит вам отдельные значения Output.

Однако, если вы хотите Output в массиве сразу после завершения всех ваших издателей, вы можете использовать collect() с участием MergeMany:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
    let publishers = ingredients.map(createIngredient(ingredient:))
    return Publishers.MergeMany(publishers).collect().eraseToAnyPublisher()
}

И любой из приведенных выше примеров вы можете упростить до одной строки, если хотите, то есть:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
    Publishers.MergeMany(ingredients.map(createIngredient(ingredient:))).eraseToAnyPublisher()
}

Вы также можете определить свой собственный merge() метод расширения на Sequence и используйте это, чтобы немного упростить код:

extension Sequence where Element: Publisher {
    func merge() -> Publishers.MergeMany<Element> {
        Publishers.MergeMany(self)
    }
}

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
    ingredients.map(createIngredient).merge().eraseToAnyPublisher()
}

Чтобы добавить ответ Tricky, вот решение, которое сохраняет порядок элементов в массиве. Он передает индекс для каждого элемента по всей цепочке и сортирует собранный массив по индексу.

Сложность должна быть O(n log n) из-за сортировки.

      import Combine

extension Publishers {

    private struct EnumeratedElement<T> {
        let index: Int
        let element: T

        init(index: Int, element: T) {
            self.index = index
            self.element = element
        }

        init(_ enumeratedSequence: EnumeratedSequence<[T]>.Iterator.Element) {
            index = enumeratedSequence.offset
            element = enumeratedSequence.element
        }
    }

    static func mergeMappedRetainingOrder<InputType, OutputType>(
        _ inputArray: [InputType],
        mapTransform: (InputType) -> AnyPublisher<OutputType, Error>
    ) -> AnyPublisher<[OutputType], Error> {

        let enumeratedInputArray = inputArray.enumerated().map(EnumeratedElement.init)

        let enumeratedMapTransform: (EnumeratedElement<InputType>) -> AnyPublisher<EnumeratedElement<OutputType>, Error> = { enumeratedInput in
            mapTransform(enumeratedInput.element)
                .map { EnumeratedElement(index: enumeratedInput.index, element: $0)}
                .eraseToAnyPublisher()
        }

        let sortEnumeratedOutputArrayByIndex: ([EnumeratedElement<OutputType>]) -> [EnumeratedElement<OutputType>] = { enumeratedOutputArray in
            enumeratedOutputArray.sorted { $0.index < $1.index }
        }

        let transformToNonEnumeratedArray: ([EnumeratedElement<OutputType>]) -> [OutputType] = {
            $0.map { $0.element }
        }

        return Publishers.MergeMany(enumeratedInputArray.map(enumeratedMapTransform))
            .collect()
            .map(sortEnumeratedOutputArrayByIndex)
            .map(transformToNonEnumeratedArray)
            .eraseToAnyPublisher()
    }
}

Модульный тест для решения:

      import XCTest
import Combine

final class PublishersExtensionsTests: XCTestCase {

    // MARK: - Private properties

    private var cancellables = Set<AnyCancellable>()

    // MARK: - Tests

    func test_mergeMappedRetainingOrder() {
        let expectation = expectation(description: "mergeMappedRetainingOrder publisher")

        let numbers = (1...100).map { _ in Int.random(in: 1...3) }

        let mapTransform: (Int) -> AnyPublisher<Int, Error> = {
            let delayTimeInterval = RunLoop.SchedulerTimeType.Stride(Double($0))
            return Just($0)
                .delay(for: delayTimeInterval, scheduler: RunLoop.main)
                .setFailureType(to: Error.self)
                .eraseToAnyPublisher()
        }

        let resultNumbersPublisher = Publishers.mergeMappedRetainingOrder(numbers, mapTransform: mapTransform)

        resultNumbersPublisher.sink(receiveCompletion: { _ in }, receiveValue: { resultNumbers in
            XCTAssertTrue(numbers == resultNumbers)
            expectation.fulfill()
         }).store(in: &cancellables)

        waitForExpectations(timeout: 5)
    }
}

Вы можете сделать это в одну строку:

      .flatMap(Publishers.Sequence.init(sequence:))

Если порядок важен, попробуйте что-нибудь подобное:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
    // first attempt
    let results = ingredients
            .map(createIngredient)
    // results = [AnyPublisher<CreateIngredientMutation.Data, Error>]

    var resultPublisher = Empty<CreateIngredientMutation.Data, Error>

    for result in results {
        resultPublisher = resultPublisher.append(result)
    }

    return resultPublisher.collect()
}
Другие вопросы по тегам