gpt4 book ai didi

swift - 为什么 `Publishers.Map`会急切的消费上游值呢?

转载 作者:行者123 更新时间:2023-12-04 13:15:13 25 4
gpt4 key购买 nike

假设我有一个自定义订阅者请求一个订阅值,然后在收到前一个值三秒后请求一个附加值:

class MySubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never

private var subscription: Subscription?

func receive(subscription: Subscription) {
print("Subscribed")

self.subscription = subscription
subscription.request(.max(1))
}

func receive(_ input: Int) -> Subscribers.Demand {
print("Value: \(input)")

DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
self.subscription?.request(.max(1))
}

return .none
}

func receive(completion: Subscribers.Completion<Never>) {
print("Complete")
subscription = nil
}
}

如果我使用它来订阅一个无限范围的发布者,背压处理得很好,发布者每次等待 3 秒,直到它收到下一个发送值的请求:

(1...).publisher.subscribe(MySubscriber())

// Prints values infinitely with ~3 seconds between each:
//
// Subscribed
// Value: 1
// Value: 2
// Value: 3
// ...

但是如果我添加一个 map 运算符,那么 MySubscriber 甚至都不会收到订阅; map 似乎在收到其订阅后同步请求了 Demand.Unlimited,并且当 map 试图耗尽无限范围时,应用会无限旋转:

(1...).publisher
.map { value in
print("Map: \(value)")
return value * 2
}
.subscribe(MySubscriber())

// The `map` transform is executed infinitely with no delay:
//
// Map: 1
// Map: 2
// Map: 3
// ...

我的问题是,为什么 map 会这样?我本以为 map 只是将其下游需求传递给上游。由于 map 应该用于转换而不是副作用,所以我不明白其当前行为的用例是什么。

编辑

我实现了一个 map 版本来展示我认为它应该如何工作:

extension Publishers {
struct MapLazily<Upstream: Publisher, Output>: Publisher {
typealias Failure = Upstream.Failure

let upstream: Upstream
let transform: (Upstream.Output) -> Output

init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output) {
self.upstream = upstream
self.transform = transform
}

public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
let mapSubscriber = Subscribers.LazyMapSubscriber(downstream: subscriber, transform: transform)
upstream.receive(subscriber: mapSubscriber)
}
}
}

extension Subscribers {
class LazyMapSubscriber<Input, DownstreamSubscriber: Subscriber>: Subscriber {
let downstream: DownstreamSubscriber
let transform: (Input) -> DownstreamSubscriber.Input

init(downstream: DownstreamSubscriber, transform: @escaping (Input) -> DownstreamSubscriber.Input) {
self.downstream = downstream
self.transform = transform
}

func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}

func receive(_ input: Input) -> Subscribers.Demand {
downstream.receive(transform(input))
}

func receive(completion: Subscribers.Completion<DownstreamSubscriber.Failure>) {
downstream.receive(completion: completion)
}
}
}

extension Publisher {
func mapLazily<Transformed>(transform: @escaping (Output) -> Transformed) -> AnyPublisher<Transformed, Failure> {
Publishers.MapLazily(upstream: self, transform: transform).eraseToAnyPublisher()
}
}

使用此运算符,MySubscriber 立即接收订阅并且仅在有需求时才执行 mapLazily 转换:

(1...).publisher
.mapLazily { value in
print("Map: \(value)")
return value * 2
}
.subscribe(MySubscriber())

// Only transforms the values when they are demanded by the downstream subscriber every 3 seconds:
//
// Subscribed
// Map: 1
// Value: 2
// Map: 2
// Value: 4
// Map: 3
// Value: 6
// Map: 4
// Value: 8

我的猜测是为 Publishers.Sequence 定义的 map 的特定重载正在使用某种快捷方式来提高性能。这打破了无限序列,但即使对于有限序列,无论下游需求如何,急切地耗尽序列都会打乱我的直觉。在我看来,以下代码:

(1...3).publisher
.map { value in
print("Map: \(value)")
return value * 2
}
.subscribe(MySubscriber())

应该打印:

Subscribed
Map: 1
Value: 2
Map: 2
Value: 4
Map: 3
Value: 6
Complete

而是打印:

Map: 1
Map: 2
Map: 3
Subscribed
Value: 2
Value: 4
Value: 6
Complete

最佳答案

这是一个不涉及任何自定义订阅者的更简单的测试:

(1...).publisher
//.map { $0 }
.flatMap(maxPublishers: .max(1)) {
(i:Int) -> AnyPublisher<Int,Never> in
Just<Int>(i)
.delay(for: 3, scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
.sink { print($0) }
.store(in: &storage)

它按预期工作,但是如果你取消对 .map 的注释,你将一无所获,因为 .map 运算符在不发布任何内容的情况下累积无限上游值。

根据您的假设,即 map 以某种方式针对前面的序列发布者进行了优化,我尝试了以下解决方法:

(1...).publisher.eraseToAnyPublisher()
.map { $0 }
// ...

果然,它解决了问题!通过对 map 运算符隐藏序列发布者,我们阻止了优化。

关于swift - 为什么 `Publishers.Map`会急切的消费上游值呢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61143246/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com