- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在使用 Sinks.Many<String>
时遇到了我不明白的行为将一些事件通知给多个订阅者:
fun main() {
val sink : Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
val flux = sink.asFlux().log()
val d = flux.subscribe {
println("--> $it")
}
sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)
val d2 = flux.subscribe {
println("--2> $it")
}
sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
此代码显示第一个订阅者获得值 1 和 2,第二个订阅者获得 2。到目前为止一切顺利:
11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> 2
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--2> 2
现在,假设第一个订阅者在第一次发射后处理(取消)其订阅,我期待第一个订阅者获得 1,第二个获得 2:
val sink : Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
val flux = sink.asFlux().log()
val d = flux.subscribe {
println("--> $it")
}
sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)
d.dispose()
val d2 = flux.subscribe {
println("--2> $it")
}
sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
11:51:48.684 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.685 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - cancel()
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.690 [main] INFO reactor.Flux.EmitterProcessor.1 - onComplete()
然而,当第二个订阅者尝试订阅时,通量被认为已完成。为什么会这样?我需要 Sinks.Many 随时可用,以便在不取消的情况下订阅和取消订阅。
最佳答案
我刚刚遇到了同样的问题。
这是由 autoCancel 默认为 true 引起的。不幸的是onBackpressureBuffer javadoc没有提及它。
此行为继承自 EmitterProcessor.create它被记录的地方。
要将 autoCancel 标志设置为 false,必须使用替代方法 onBackpressureBuffer
Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
关于kotlin - 为什么 Sinks.many().multicast().onBackpressureBuffer() 在订阅者之一取消订阅后完成以及如何避免它,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66671636/
我想用 Future 做一个异步工作。 但是下面的 .sink() 闭包 永远不会被调用。 看起来 Future 的实例在它被调用后就被释放了。 Future { promise in
我在我的 aspnet core 应用程序中使用 Serilog 进行日志记录。我需要频繁地将日志事件写入控制台(每秒 300-500 个事件)。我在 docker 容器内运行我的应用程序,并使用 O
为了查看在 foreach() 中运行的函数输出的控制台消息循环我遵循了 this guy 的建议并添加了一个 sink()像这样调用: library(foreach) libra
我正在使用 kafka connect 从 Mongodb -> Elasticsearch 移动数据。 目前,更新的记录作为新文档插入到 Elasticsearch 索引中。但是,我想根据 ID 更
我在 R 中处理以下数据: > str(df) 'data.frame': 369269 obs. of 12 variables: $ bkod : int 110006 110006 1
我正在使用 Tokio 玩异步/等待功能,并在我的 Cargo.toml 中启用了异步/等待功能。 (以及 2018 年版的最新 Rust nightly): tokio = { version =
这个问题在这里已经有了答案: URLSession.shared.dataTaskPublisher not working on IOS 13.3 (3 个回答) 1年前关闭。 这是我的管道: UR
我正在尝试使用flume来使用Twitter Stream API并将该tweet索引到我的elasticsearch中。我将flume.conf设置为使用com.cloudera.flume.sou
我正在尝试将数据从 kafka(最终我们将使用在不同实例上运行的 kafka)发送到 hdfs。我认为 Flume 或某种摄取协议(protocol)对于将数据导入 hdfs 是必要的。所以我们使用c
我怎样才能将输出重定向到某个 txt 文件,但以这样一种方式,以便我可以在逐步生成的同时在控制台中同时看到该输出? 最佳答案 只需使用 sink 的 split=TRUE 参数: sink(file=
我一直在疯狂地尝试让这一切发生,但我就是想不通(初学者)。 正如你所看到的,当你向下滚动时,顶部的头部部分会粘在页面的顶部,但也会溢出一点。这是用 stickyjs 完成的。我也想对头部底部做同样的事
我一直在使用 Serilog.Sinks.Email,我注意到(除了我没有收到电子邮件这一事实),当我执行 Log 语句时没有异常或任何表明失败的信息.即使我为 MailServer 放入垃圾邮件,L
两者的输出 pactl list sink-inputs和 pacmd list-sink-inputs包含一个属性部分: Properties: media.name = "ALSA Pla
我有一个函数f :: ByteString -> String ,并且需要一个 Sink ByteString (ResourceT IO) . 我怎么得到这个? 不幸的是,这些文档不是很有帮助...
我将 RxSwift 与以下类似的东西一起使用 extension Reactive where Base: UIViewController { public var showError:
如何在 Python Spark 结构化流中使用 foreach 来触发输出操作。 query = wordCounts\ .writeStream\ .outputMode('upd
我正在尝试将 R 脚本的错误和警告记录到外部文件中。同时我希望能够在 RStudio 的控制台中看到错误和警告(对开发和调试有用)。我正在尝试使用以下代码: logfile <- file("my_f
我正在使用 R 的 sink() 函数将错误、警告、消息和控制台输出捕获到单个文本文件中。 我想知道是否同时沉没两个 留言 和 输出 类型到单个打开的文件是不好的吗? 我将上述所有内容捕获到一个文件中
我正在测试Serilog.Sinks.Elasticsearch库和ELK堆栈(Elasticsearch&Kibana)的组合,以从我的ASP.NET Core 2.2应用程序中收集日志。 Web应
我基本上是从 Kafka 源读取数据,并将每条消息转储到我的 foreach 处理器(感谢 Jacek 页面提供的简单示例)。 如果这确实有效,我实际上应该在此处的 process 方法中执行一些业务
我是一名优秀的程序员,十分优秀!