gpt4 book ai didi

ios - 快速组合 : Buffer upstream values and emit them at a steady rate?

转载 作者:行者123 更新时间:2023-12-02 02:34:24 24 4
gpt4 key购买 nike

在 iOS 13 中使用新的 Combine 框架。

假设我有一个上游发布者以非常不规则的速率发送值 - 有时几秒钟或几分钟可能没有任何值,然后一串值可能会同时通过。我想创建一个自定义发布者订阅上游值,缓冲它们并在它们进入时以常规的已知节奏发出它们,但如果它们都已经用尽,则不发布任何内容。

举个具体的例子:

  • t = 0 到 5000 毫秒:未发布上游值
  • t = 5001ms:上游发布“a”
  • t = 5002ms:上游发布“b”
  • t = 5003ms:上游发布“c”
  • t = 5004ms 到 10000ms:没有发布上游值
  • t = 10001ms:上游发布“d”

  • 我订阅上游的发布者将每 1 秒产生一次值:
  • t = 0 到 5000 毫秒:未发布值
  • t = 5001ms:发布“a”
  • t = 6001ms:发布“b”
  • t = 7001ms:发布“c”
  • t = 7001ms 到 10001ms:未发布值
  • t = 10001ms:发布“d”

  • Combine 中现有的出版商或运营商似乎都没有完全按照我的意愿行事。
  • throttle debounce 将简单地以特定节奏对上游值进行采样并删除丢失的值(例如,如果节奏为 1000 毫秒,则只会发布“a”)
  • delay 将为每个值添加相同的延迟,但不会将它们分开(例如,如果我的延迟是 1000 毫秒,它将在 6001 毫秒发布“a”,在 6002 毫秒发布“b”,在 6003 毫秒发布“c”)
  • buffer 看起来很有希望,但我不太清楚如何使用它——如何强制它按需从缓冲区中发布一个值。当我将水槽连接到 buffer 时它似乎只是立即发布所有值,根本没有缓冲。

  • 我考虑过使用某种组合运算符,例如 zipmergecombineLatest并将它与 Timer 发布者结合起来,这可能是正确的方法,但我无法弄清楚如何配置它以提供我想要的行为。

    编辑

    这是一个大理石图,希望能说明我的目标:
    Upstream Publisher:
    -A-B-C-------------------D-E-F--------|>

    My Custom Operator:
    -A----B----C-------------D----E----F--|>

    编辑 2:单元测试

    这是一个应该通过的单元测试 modulatedPublisher (我想要的缓冲发布者)按需要工作。它并不完美,但它会在收到事件时存储事件(包括收到的时间),然后比较事件之间的时间间隔,确保它们不小于所需的间隔。
    func testCustomPublisher() {
    let expectation = XCTestExpectation(description: "async")
    var events = [Event]()

    let passthroughSubject = PassthroughSubject<Int, Never>()
    let cancellable = passthroughSubject
    .modulatedPublisher(interval: 1.0)
    .sink { value in
    events.append(Event(value: value, date: Date()))
    print("value received: \(value) at \(self.dateFormatter.string(from:Date()))")
    }

    // WHEN I send 3 events, wait 6 seconds, and send 3 more events
    passthroughSubject.send(1)
    passthroughSubject.send(2)
    passthroughSubject.send(3)

    DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(6000)) {
    passthroughSubject.send(4)
    passthroughSubject.send(5)
    passthroughSubject.send(6)

    DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(4000)) {

    // THEN I expect the stored events to be no closer together in time than the interval of 1.0s
    for i in 1 ..< events.count {
    let interval = events[i].date.timeIntervalSince(events[i-1].date)
    print("Interval: \(interval)")

    // There's some small error in the interval but it should be about 1 second since I'm using a 1s modulated publisher.
    XCTAssertTrue(interval > 0.99)
    }
    expectation.fulfill()
    }
    }

    wait(for: [expectation], timeout: 15)
    }

    我得到的最接近的是使用 zip ,像这样:
    public extension Publisher where Self.Failure == Never {
    func modulatedPublisher(interval: TimeInterval) -> AnyPublisher<Output, Never> {
    let timerBuffer = Timer
    .publish(every: interval, on: .main, in: .common)
    .autoconnect()

    return timerBuffer
    .zip(self, { $1 }) // should emit one input element ($1) every timer tick
    .eraseToAnyPublisher()
    }
    }

    这正确地协调了前三个事件(1、2 和 3),但不协调后三个事件(4、5 和 6)。输出:
    value received: 1 at 3:54:07.0007
    value received: 2 at 3:54:08.0008
    value received: 3 at 3:54:09.0009
    value received: 4 at 3:54:12.0012
    value received: 5 at 3:54:12.0012
    value received: 6 at 3:54:12.0012

    我相信这是因为 zip有一定的内部缓冲能力。前三个上游事件被缓冲并在 Timer 的节奏上发出,但是在 6 秒的等待期间,Timer 的事件被缓冲 - 当第二个设置上游事件被触发时,队列中已经有 Timer 事件等待,所以它们'配对并立即发射。

    最佳答案

    编辑
    与下面概述的原始方法相比,还有一种更简单的方法,它不需要起搏器,而是使用 flatMap(maxPublishers: .max(1)) 创建的背压。 .flatMap发送请求 1,直到它返回的发布者(我们可以延迟)完成。我们需要一个 Buffer上游发布者缓冲值。

    // for demo purposes, this subject sends a Date:
    let subject = PassthroughSubject<Date, Never>()
    let interval = 1.0

    let pub = subject
    .buffer(size: .max, prefetch: .byRequest, whenFull: .dropNewest)
    .flatMap(maxPublishers: .max(1)) {
    Just($0)
    .delay(for: .seconds(interval), scheduler: DispatchQueue.main)
    }

    原创
    我知道这是一个老问题,但我认为有一个更简单的方法来实现这个,所以我想我会分享。
    这个想法类似于 .zipTimer , 除了而不是 Timer , 你会 .zip与先前发送的值的延时“滴答”,这可以通过 CurrentValueSubject 来实现. CurrentValueSubject需要而不是 PassthroughSubject为了播种第一个“滴答声”。
    // for demo purposes, this subject sends a Date:
    let subject = PassthroughSubject<Date, Never>()

    let pacer = CurrentValueSubject<Void, Never>(())
    let interval = 1.0

    let pub = subject.zip(pacer)
    .flatMap { v in
    Just(v.0) // extract the original value
    .delay(for: .seconds(interval), scheduler: DispatchQueue.main)
    .handleEvents(receiveOutput: { _ in
    pacer.send() // send the pacer "tick" after the interval
    })
    }
    发生的事情是 .zip起搏器上的门,它仅在先前发送的值延迟后到达。
    如果下一个值早于允许的间隔,它会等待起搏器。
    但是,如果下一个值稍后出现,那么起搏器已经有一个新值可以立即提供,因此不会有延迟。

    如果您像在测试用例中那样使用它:
    let c = pub.sink { print("\($0): \(Date())") }

    subject.send(Date())
    subject.send(Date())
    subject.send(Date())

    DispatchQueue.main.asyncAfter(deadline: .now() + 1.0) {
    subject.send(Date())
    subject.send(Date())
    }

    DispatchQueue.main.asyncAfter(deadline: .now() + 10.0) {
    subject.send(Date())
    subject.send(Date())
    }
    结果将是这样的:
    2020-06-23 19:15:21 +0000: 2020-06-23 19:15:21 +0000
    2020-06-23 19:15:21 +0000: 2020-06-23 19:15:22 +0000
    2020-06-23 19:15:21 +0000: 2020-06-23 19:15:23 +0000
    2020-06-23 19:15:22 +0000: 2020-06-23 19:15:24 +0000
    2020-06-23 19:15:22 +0000: 2020-06-23 19:15:25 +0000
    2020-06-23 19:15:32 +0000: 2020-06-23 19:15:32 +0000
    2020-06-23 19:15:32 +0000: 2020-06-23 19:15:33 +0000

    关于ios - 快速组合 : Buffer upstream values and emit them at a steady rate?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59367202/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com