gpt4 book ai didi

swift - 如何限制 Combine 中的 flatMap 并发性仍然处理所有源事件?

转载 作者:行者123 更新时间:2023-12-05 02:00:51 26 4
gpt4 key购买 nike

如果我指定 maxPublishers 参数,那么第一个 maxPublishers 事件之后的源事件将不会被平面映射。虽然我只想限制并发。即在一些第一批 maxPublishers 平面 map 发布器完成后继续处理下一个事件。

Publishers.Merge(
addImageRequestSubject
.flatMap(maxPublishers: .max(3)) { self.compressImage($0) }
.compactMap { $0 }
.flatMap(maxPublishers: .max(3)) { self.addImage($0) },
addVideoRequestSubject
.flatMap(maxPublishers: .max(3)) { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

我还尝试在 OperationQueue 的帮助下限制并发。但是 maxConcurrentOperationCount 似乎没有效果。

Publishers.Merge(
addImageRequestSubject
.receive(on: imageCompressionQueue)
.flatMap { self.compressImage($0) }
.compactMap { $0 }
.receive(on: mediaAddingQueue)
.flatMap { self.addImage($0) },
addVideoRequestSubject
.receive(on: mediaAddingQueue)
.flatMap { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

private lazy var imageCompressionQueue: OperationQueue = {
var queue = OperationQueue()
queue.maxConcurrentOperationCount = 3

return queue
}()

private lazy var mediaAddingQueue: OperationQueue = {
var queue = OperationQueue()
queue.maxConcurrentOperationCount = 3

return queue
}()

平面 map 发布者是这样看的:

func compressImage(_ image: UIImage) -> Future<Data?, Never> {
Future { promise in
DispatchQueue.global().async {
let result = image.compressTo(15)?.jpegData(compressionQuality: 1)
promise(Result.success(result))
}
}
}

最佳答案

您已经非常漂亮地进入了 .buffer 运算符的用例。其目的是通过累积否则会被丢弃的值来补偿 .flatMap 背压。

我将通过一个完全人为的例子来说明:

class ViewController: UIViewController {
let sub = PassthroughSubject<Int,Never>()
var storage = Set<AnyCancellable>()
var timer : Timer!
override func viewDidLoad() {
super.viewDidLoad()
sub
.flatMap(maxPublishers:.max(3)) { i in
return Just(i)
.delay(for: 3, scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
.sink { print($0) }
.store(in: &storage)

var count = 0
self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) {
_ in
count += 1
self.sub.send(count)
}
}
}

所以,我们的发布者每秒都会发出一个递增的整数,但是我们的 flatMap.max(3) 并且需要 3 秒来重新发布一个值。结果是我们开始遗漏值:

1
2
3
5
6
7
9
10
11
...

解决办法是在flatMap前放一个buffer。它需要足够大以保存任何遗漏值足够长的时间以便它们被请求:

        sub
.buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
.flatMap(maxPublishers:.max(3)) { i in

结果是所有数值实际上都到达了sink。当然,在现实生活中,如果缓冲区不够大以补偿发布者的值(value)排放率与背压 flatMap 的值(value)排放率之间的差异,我们可能仍然会丢失值(value)

关于swift - 如何限制 Combine 中的 flatMap 并发性仍然处理所有源事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66982854/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com