- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 Reactor Kafka 应用程序,它无限期地使用来自主题的消息。我需要公开一个健康检查 REST 端点,它可以指示此过程的健康状况——主要是想知道 Kafka 接收器通量序列是否已终止,以便可以采取一些措施来启动它。有没有办法知道通量的当前状态(完成/终止等)?应用是Spring Webflux + Reactor Kafka。
编辑 1 - doOnTerminate/doFinally 不执行
Flux.range(1, 5)
.flatMap(record -> Mono.just(record)
.map(i -> {
throw new OutOfMemoryError("Forcing exception for " + i);
})
.doOnNext(i -> System.out.println("doOnNext: " + i))
.doOnError(e -> System.err.println(e))
.onErrorResume(e -> Mono.empty()))
.doFinally(signalType -> System.err.println("doFinally: Terminating with Signal type: " + signalType))
.doOnTerminate(()-> System.err.println("doOnTerminate: executed"))
.subscribe();
"C:\Program Files\Java\jdk1.8.0_211\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.2.4\lib\idea_rt.jar=52295:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.2.4\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_211\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_211\jre\lib\rt.jar;C:\Users\akoul680\intellij-workspace\basics\target\classes;C:\Users\akoul680\.m2\repository\com\zaxxer\HikariCP\3.4.1\HikariCP-3.4.1.jar;C:\Users\akoul680\.m2\repository\org\apache\kafka\kafka-clients\2.2.0\kafka-clients-2.2.0.jar;C:\Users\akoul680\.m2\repository\com\github\luben\zstd-jni\1.3.8-1\zstd-jni-1.3.8-1.jar;C:\Users\akoul680\.m2\repository\org\lz4\lz4-java\1.5.0\lz4-java-1.5.0.jar;C:\Users\akoul680\.m2\repository\org\xerial\snappy\snappy-java\1.1.7.2\snappy-java-1.1.7.2.jar;C:\Users\akoul680\.m2\repository\org\apache\avro\avro\1.9.0\avro-1.9.0.jar;C:\Users\akoul680\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.9.8\jackson-core-2.9.8.jar;C:\Users\akoul680\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.9.8\jackson-databind-2.9.8.jar;C:\Users\akoul680\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;C:\Users\akoul680\.m2\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;C:\Users\akoul680\.m2\repository\com\codahale\metrics\metrics-core\3.0.2\metrics-core-3.0.2.jar;C:\Users\akoul680\.m2\repository\org\junit\jupiter\junit-jupiter-api\5.3.2\junit-jupiter-api-5.3.2.jar;C:\Users\akoul680\.m2\repository\org\apiguardian\apiguardian-api\1.0.0\apiguardian-api-1.0.0.jar;C:\Users\akoul680\.m2\repository\org\opentest4j\opentest4j\1.1.1\opentest4j-1.1.1.jar;C:\Users\akoul680\.m2\repository\org\junit\platform\junit-platform-commons\1.3.2\junit-platform-commons-1.3.2.jar;C:\Users\akoul680\.m2\repository\org\slf4j\slf4j-api\1.7.26\slf4j-api-1.7.26.jar;C:\Users\akoul680\.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\akoul680\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\akoul680\.m2\repository\io\projectreactor\reactor-core\3.4.10\reactor-core-3.4.10.jar;C:\Users\akoul680\.m2\repository\org\reactivestreams\reactive-streams\1.0.3\reactive-streams-1.0.3.jar;C:\Users\akoul680\.m2\repository\io\projectreactor\reactor-test\3.4.10\reactor-test-3.4.10.jar;C:\Users\akoul680\.m2\repository\commons-net\commons-net\3.6\commons-net-3.6.jar;C:\Users\akoul680\.m2\repository\com\box\box-java-sdk\2.32.0\box-java-sdk-2.32.0.jar;C:\Users\akoul680\.m2\repository\com\eclipsesource\minimal-json\minimal-json\0.9.1\minimal-json-0.9.1.jar;C:\Users\akoul680\.m2\repository\org\bitbucket\b_c\jose4j\0.4.4\jose4j-0.4.4.jar;C:\Users\akoul680\.m2\repository\org\bouncycastle\bcprov-jdk15on\1.52\bcprov-jdk15on-1.52.jar;C:\Users\akoul680\.m2\repository\com\jcraft\jsch\0.1.55\jsch-0.1.55.jar;C:\Users\akoul680\.m2\repository\org\apache\commons\commons-vfs2\2.4\commons-vfs2-2.4.jar;C:\Users\akoul680\.m2\repository\commons-logging\commons-logging\1.2\commons-logging-1.2.jar;C:\Users\akoul680\.m2\repository\org\bouncycastle\bcpkix-jdk15on\1.52\bcpkix-jdk15on-1.52.jar;C:\Users\akoul680\intellij-workspace\basics\lib\db2jcc4.jar" lrn.chapter14.ErrorHandling
2021-10-12T09:53:34,344 main r.util.Loggers - Using Slf4j logging framework
Exception in thread "main" java.lang.OutOfMemoryError: Forcing exception for 1
at lrn.chapter14.ErrorHandling.lambda$null$0(ErrorHandling.java:19)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:281)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:354)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.request(FluxPeekFuseable.java:437)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2194)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onSubscribe(FluxPeekFuseable.java:471)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:263)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
at reactor.core.publisher.Mono.subscribe(Mono.java:4361)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:426)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
at reactor.core.publisher.Flux.subscribe(Flux.java:8468)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8641)
at reactor.core.publisher.Flux.subscribe(Flux.java:8438)
at reactor.core.publisher.Flux.subscribe(Flux.java:8362)
at reactor.core.publisher.Flux.subscribe(Flux.java:8280)
at lrn.chapter14.ErrorHandling.ex5(ErrorHandling.java:26)
at lrn.chapter14.ErrorHandling.main(ErrorHandling.java:12)
Process finished with exit code 1
最佳答案
您无法查询通量本身,但您可以告诉它在停止时做某事。
在包含您的 Kafka 监听器的服务中,我建议添加一个默认为 false 的 terminated
(或类似的) bool 标志。然后,您可以确保通量中的最后一个运算符是:
.doOnTerminate(() -> terminated = true)
...然后获取健康检查端点以监控该值,如果该标志为 true
,则将容器标记为不健康。
doOnTerminate()
比 doOnError()
更可靠,因为它会执行发布者是否因错误或完成信号而终止.不过根据评论,这并不完全可靠 - 如果您的发布者由于 JVM 错误或类似错误而终止,则 doOnTerminate()
运算符将不会运行。
根据我的经验,如果发生这种情况通常是由于OutOfMemoryError
,在这种情况下-XX:+ExitOnOutOfMemoryError
是一个很好的选择使用的 VM 选项(立即退出然后可以触发立即重启策略,而无需等待调用健康检查端点并在一段时间后触发重启。)
请记住,还有其他致命的 JVM 错误不会被上述过程捕获,因此仍然不是 100% 可靠。
关于spring-webflux - Spring webflux 应用程序中的 Reactor Kafka 健康检查,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69528045/
我需要根据需要动态设置文本区域,但它不想正常工作。 JQuery 会自行检查,但无法检查是否已检查。但是当您在第二个单选框内单击时,始终需要文本区域。我尝试了很多次让它工作,但它仍然有问题。我添加了“
我需要在 Django 中进行 API 调用(某种形式),作为我们所需的自定义身份验证系统的一部分。用户名和密码通过 SSL 发送到特定 URL(对这些参数使用 GET),响应应该是 HTTP 200
我将在我的可移植 C 代码中使用 #warning 来生成编译时警告。但并非所有平台都支持 #warning。有什么方法可以找到该平台是否支持 #warning。 #ifdef warning
我编写了一个函数来检查某个数字是否存在于某个区间内。停止搜索的最佳方法是什么?这个: for (i = a; i <= b; i++) { fi = f(i); if (fi == c) {
我想知道在 c 中是否有一种方法可以检查,例如在 for 函数中,如果变量等于或不等于某些字符,而不必每次都重复进行相等性检查。如果我没记错的话,以这种方式检查相等性是不正确的: if (a == (
我有如下日志功能 void log_error(char * file_name, int line_num, int err_code) { printf("%s:%d:%s\n", fil
使用 ssh-keygen 生成的 key 对在 macOS 上可以有不同的格式。 macOS 可读的标准 PEM ASN.1 对象 SecKey API 带有文本标题的 PEM OpenSSH ke
我正在尝试编写一个 excel if 语句。我不熟悉使用 Excel 具有的所有额外功能。我正在使用一个名为 importXML() 的函数.我正在尝试检查我正在使用的函数是否生成“#VALUE!”错
有没有办法检查是否没有 AIO 写入给定文件?我在我的 Unix 类(class)上制作了一个项目,该项目将是一个上下文无关(基于 UDP)的国际象棋服务器,并且所有数据都必须存储在文件中。应用程序将
我有一个如下所示的函数: public Status execute() { Status status = doSomething(); if (status != Stat
我正在使用 Composer,我不希望 PhpStorm 在 vendor 文件夹上运行任何错误检查或检查,因为它对 vendor/中的某些代码显示误报composer/autoload_static
Chapel 的一个很好的特性是它区分了数组的域和它的分布。检查两个数组是否具有相同的域和分布(通常想要的)的最佳方法是什么? 我能看到的最好的方法是检查 D1==D2和 D1.dist==D2.di
在我的 JavaScript 函数中,我为所有输入、文本区域和选择字段提供实际值作为 initial_value: $('input, textarea, select').each(function
我正在编写一个分解为几个简单函数的 PHP 类。在构造函数中,它调用另一个名为 processFile 的函数。该函数调用 5 个私有(private)函数并进行检查。如果检查失败,它会将消息分配给
这个问题已经有答案了: How to detect if user it trying to open a link in a new tab? (2 个回答) 已关闭 7 年前。 我认为 JavaS
我正在浏览我们的代码库并看到很多这样的测试: declare @row_id int = ... declare @row_attribute string select @row_attribu
我正在声明一个用作比较的函数。我的问题是: 为什么条件充当语句? 为什么第 4 行可以工作,而第 5 行却不行? 我知道这段代码不切实际且未使用,但为什么编译器允许这种语法? 谷歌没有找到答案。但话又
到目前为止,我有一个带有空文本字段的 PHP Kontaktform,并使用以下命令检查了所需的字段: $name = check_input($_POST['name'], "请输入姓名。"); 现
目前,我能想到的合理检查的唯一方法没有臃肿的逻辑: if ( $value > 0 ) { // Okay } else { // Not Okay } 有没有更好的办法? 最佳答案
我正在尝试运行一个脚本,如果 i 存在(意味着存在 i 值,任何值)或其他部分,我希望运行其中的一部分如果i没有值就运行,有人可以启发我吗? 我说的是 for 循环,比如 for (var i=0;
我是一名优秀的程序员,十分优秀!