Просмотр спама AsyncStream, где AsyncPublisher не
Я сталкиваюсь с поведением с AsyncStream, которое я не совсем понимаю.
Когда у меня есть актер с опубликованной переменной, я могу «подписаться» на него через AsyncPublisher, и он ведет себя так, как ожидалось, обновляясь только при изменении значения. Если я создаю AsyncStream с синхронным контекстом (но с потенциальной проблемой сохранения задачи), он также ведет себя так, как ожидалось.
Странность возникает, когда я пытаюсь обернуть этого издателя в AsyncStream с асинхронным контекстом. Кажется, он начинает спамить представление обновлением за цикл, а НЕ только тогда, когда есть изменение.
Что мне не хватает в AsyncStream.init(unfolding:oncancel:), который вызывает такое поведение?
https://developer.apple.com/documentation/swift/asyncstream/init(развертывание:oncancel:)?
import Foundation
import SwiftUI
actor TestService {
static let shared = TestService()
@MainActor @Published var counter:Int = 0
@MainActor public func updateCounter(by delta:Int) async {
counter = counter + delta
}
public func asyncStream() -> AsyncStream<Int> {
return AsyncStream.init(unfolding: unfolding, onCancel: onCancel)
//() async -> _?
func unfolding() async -> Int? {
for await n in $counter.values {
//print("\(location)")
return n
}
return nil
}
//optional
@Sendable func onCancel() -> Void {
print("confirm counter got canceled")
}
}
public func syncStream() -> AsyncStream<Int> {
AsyncStream { continuation in
let streamTask = Task {
for await n in $counter.values {
continuation.yield(n)
}
}
continuation.onTermination = { @Sendable _ in
streamTask.cancel()
print("StreamTask Canceled")
}
}
}
}
struct ContentView: View {
var body: some View {
VStack {
TestActorButton()
HStack {
//TestActorViewA() //<-- uncomment at your own risk.
TestActorViewB()
TestActorViewC()
}
}
.padding()
}
}
struct TestActorButton:View {
var counter = TestService.shared
var body: some View {
Button("increment counter") {
Task { await counter.updateCounter(by: 2) }
}
}
}
struct TestActorViewA:View {
var counter = TestService.shared
@State var counterVal:Int = 0
var body: some View {
Text("\(counterVal)")
.task {
//Fires constantly.
for await value in await counter.asyncStream() {
print("View A Value: \(value)")
counterVal = value
}
}
}
}
struct TestActorViewB:View {
var counter = TestService.shared
@State var counterVal:Int = 0
var body: some View {
Text("\(counterVal)")
.task {
//Behaves like one would expect. Fires once per change.
for await value in await counter.$counter.values {
print("View B Value: \(value)")
counterVal = value
}
}
}
}
struct TestActorViewC:View {
var counter = TestService.shared
@State var counterVal:Int = 0
var body: some View {
Text("\(counterVal)")
.task {
//Also only fires on update
for await value in await counter.syncStream() {
print("View C Value: \(value)")
counterVal = value
}
}
}
}
1 ответ
Реальное решение для упаковки издателя, по-видимому, состоит в том, чтобы придерживаться инициализатора синхронного контекста и отменять его собственную задачу:
public func stream() -> AsyncStream<Int> {
AsyncStream { continuation in
let streamTask = Task {
for await n in $counter.values {
//do hard work to transform n
continuation.yield(n)
}
}
continuation.onTermination = { @Sendable _ in
streamTask.cancel()
print("StreamTask Canceled")
}
}
}
Из того, что я могу сказать, инициализатор стиля «разворачивания» для AsyncStream просто не подходит для упаковки AsyncPublisher. Функция «разворачивания» будет «извлекать» опубликованное значение из потока, поэтому поток будет просто продолжать выталкивать значения из этого бесконечного колодца.
Похоже, что инициализатор стиля «разворачивания» лучше всего использовать при обработке конечного (но потенциально очень большого) списка элементов или при создании значений с нуля... что-то вроде:
struct NumberQueuer {
let numbers:[Int]
public func queueStream() -> AsyncStream<Int> {
var iterator = AsyncArray(values: numbers).makeAsyncIterator()
print("Queue called")
return AsyncStream.init(unfolding: unfolding, onCancel: onCancel)
//() async -> _?
func unfolding() async -> Int? {
do {
if let item = try await iterator.next() {
return item
}
} catch let error {
print(error.localizedDescription)
}
return nil
}
//optional
@Sendable func onCancel() -> Void {
print("confirm NumberQueue got canceled")
}
}
}
public struct AsyncArray<Element>: AsyncSequence, AsyncIteratorProtocol {
let values:[Element]
let delay:TimeInterval
var currentIndex = -1
public init(values: [Element], delay:TimeInterval = 1) {
self.values = values
self.delay = delay
}
public mutating func next() async throws -> Element? {
currentIndex += 1
guard currentIndex < values.count else {
return nil
}
try await Task.sleep(nanoseconds: UInt64(delay * 1E09))
return values[currentIndex]
}
public func makeAsyncIterator() -> AsyncArray {
self
}
}
Можно заставить тип развертывания работать с @Published, создав буферный массив, который многократно проверяется. Переменная больше не должна быть @Published. Этот подход имеет много проблем, но его можно заставить работать. Если интересно, я поместил его в репозиторий с кучей других примеров AsyncStream. https://github.com/carlynorama/StreamPublisherTests
Эта статья очень помогла разобраться с этим: https://www.raywenderlich.com/34044359-asyncsequence-asyncstream-tutorial-for-ios
Как и это видео: https://www.youtube.com/watch?v=UwwKJLrg_0U