- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我尝试以多种方式分析数据(来自 RabbitMQ/spring cloud stream reactive)。我需要找到一种方法将测量通量分成多个“汇”。例如,我想做十秒的数据窗口,然后找到最大和最小的测量值。或者检查测量值是否在安全范围内,如果不在安全范围内 - 打开警报(或发送电子邮件)。
我试过两种方法:
@StreamListener
public void receive1(@Input(AnalysisChannels.INPUT) Flux<String> measurements) {
measurements
.map(json -> gson.fromJson(json, Measurement.class))
.filter(m -> m instanceof WaterLevel)
.subscribe(m -> System.out.println(m));
}
@StreamListener
public void receive2(@Input(AnalysisChannels.INPUT) Flux<String> measurements) {
measurements
.map(json -> gson.fromJson(json, Measurement.class))
.filter(m -> m instanceof WaterLevel)
.subscribe(m -> System.out.println(m));
}
在这种情况下,一个事件只执行一个监听器(第一次随机执行一次,第二次执行一次)
@StreamListener
public void receive2(@Input(AnalysisChannels.INPUT) Flux<String> measurements) {
System.out.println("xyz");
ConnectableFlux<String> publish = measurements.publish();
publish
.map(json -> gson.fromJson(json, Measurement.class))
.filter(m -> m instanceof AirTemperature)
.subscribe(m -> System.out.println(m));
publish
.map(json -> gson.fromJson(json, Measurement.class))
.filter(m -> m instanceof AirTemperature)
.subscribe(m -> System.out.println(m));
}
在第二种情况下我得到一个异常
2019-09-08 16:43:12.720 ERROR 12972 --- [nalysis-group-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.dataAnalysis'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[145], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dataAnalysis, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=dataAnalysis.realtime-analysis-group, amqp_redelivered=false, mqtt_receivedRetained=false, amqp_receivedRoutingKey=dataAnalysis, mqtt_duplicate=false, amqp_timestamp=Sun Sep 08 16:40:49 CEST 2019, amqp_messageId=e229ef37-4672-c524-e3bb-a04e607bb9cb, id=90ff1479-e363-e13e-ead7-aa7a64aaf612, amqp_consumerTag=amq.ctag-xcdhmSOud5ZYJquDKUgsiw, contentType=application/json, mqtt_receivedTopic=/measurement/si:mu:la:00/AirPressure, mqtt_receivedQos=1, timestamp=1567953789712}], failedMessage=GenericMessage [payload=byte[145], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dataAnalysis, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=dataAnalysis.realtime-analysis-group, amqp_redelivered=false, mqtt_receivedRetained=false, amqp_receivedRoutingKey=dataAnalysis, mqtt_duplicate=false, amqp_timestamp=Sun Sep 08 16:40:49 CEST 2019, amqp_messageId=e229ef37-4672-c524-e3bb-a04e607bb9cb, id=90ff1479-e363-e13e-ead7-aa7a64aaf612, amqp_consumerTag=amq.ctag-xcdhmSOud5ZYJquDKUgsiw, contentType=application/json, mqtt_receivedTopic=/measurement/si:mu:la:00/AirPressure, mqtt_receivedQos=1, timestamp=1567953789712}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1200(AmqpInboundChannelAdapter.java:57)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:223)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:220)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1542)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1468)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1456)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1451)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1400)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043)
at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[145], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dataAnalysis, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=dataAnalysis.realtime-analysis-group, amqp_redelivered=false, mqtt_receivedRetained=false, amqp_receivedRoutingKey=dataAnalysis, mqtt_duplicate=false, amqp_timestamp=Sun Sep 08 16:40:49 CEST 2019, amqp_messageId=e229ef37-4672-c524-e3bb-a04e607bb9cb, id=90ff1479-e363-e13e-ead7-aa7a64aaf612, amqp_consumerTag=amq.ctag-xcdhmSOud5ZYJquDKUgsiw, contentType=application/json, mqtt_receivedTopic=/measurement/si:mu:la:00/AirPressure, mqtt_receivedQos=1, timestamp=1567953789712}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 23 more
通量/ react 流拆分应该是什么样的?或者也许我应该以不同的方式处理问题?
最佳答案
我启动了第二种情况下显示的代码。
那里缺少 connect()
。它应该看起来像:
public void receive2(@Input(AnalysisChannels.INPUT) Flux<String> measurements) {
ConnectableFlux<String> publish = measurements.publish();
publish.connect();
publish
.map(json -> gson.fromJson(json, Measurement.class))
.filter(m -> m instanceof AirTemperature)
.subscribe(m -> System.out.println(m));
publish
.map(json -> gson.fromJson(json, Measurement.class))
.filter(m -> m instanceof AirTemperature)
.subscribe(m -> System.out.println(m));
}
无论如何,如果有人对我的逻辑有更好的解决方案,请随时写出来!
我将感谢你的帮助。
关于java - 有什么方法可以拆分通量流吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57843080/
我正在创建一个“杀死”人的命令。我希望机器人返回消息“哈!你以为!@Author 死了!”如果他们 ping 机器人。 (我如何让机器人查看它是否被 ping 过?)答案已更新并且现在可以正常工作。
我有一个在heroku 上运行的应用程序,例如my-app.herokuapp.com。但是,如果我输入 ping -c 10 my-app.herokuapp.com 在Mac终端中,它显示请求超时
我在 minikube 集群中有一个 k8s 服务/部署(default 命名空间中的名称 amq: D20181472:argo-k8s gms$ kubectl get svc --all-nam
我有 2 个 EC2 Ubuntu 实例。它们共享相同的 VPC、子网和安全组。实例的防火墙已关闭。但是私网IP还是无法互相ping通。如何让这些实例互相 ping 通? 最佳答案 在安全组中,为“回
我可以连接到我的 wifi(另一台笔记本电脑在此网络上正常),但是浏览器不会加载网页,并且我无法 ping 通 google.com 我注意到的一件奇怪的事情是,如果我查看/etc/resolv.co
我在 Azure 上使用 PUBSUB 时遇到问题。 Azure 防火墙将关闭闲置任意时间的连接。对于时间长度存在很多争议,但人们认为大约是 5 - 15 分钟。 我使用 Redis 作为消息队列。为
我很无聊,因为我的开发服务器已关闭,我正在运行命令提示符以无限期地 ping 服务器,以便我看到它们何时停止超时并知道我可以再次工作。与此同时,我想制作一个 Air 应用程序来为我做这件事,所以当它开
是否可以向 nat 后面的主机发送回显请求 后。所有的 echo-request 都不包含目标主机的端口,因此如果有多个主机使用相同的外部 ip 地址,nat 将如何将 echo-reques
我按照以下链接创建了 azure 实例 http://michaelwasham.com/2013/09/03/connecting-clouds-site-to-site-aws-azure/ 我可
friend 们,我认为这是一件奇怪的事情(至少对我来说)。因为我了解到互联网上的每个域名都有一个对应的IP地址。它存储在 DNS 上的某个位置。 现在,这就是我从命令行 ping google.co
我正在尝试使用分配给 kube-dns 服务的集群 IP 从 dnstools pod ping kube-dns 服务。 ping 请求超时。在同一个 dnstools pod 中,我尝试使用暴露的
我按照以下链接创建了 azure 实例 http://michaelwasham.com/2013/09/03/connecting-clouds-site-to-site-aws-azure/ 我可
我有一个虚拟网络 vmnet2,使用 10.0.2.0/24 网络,我希望我的 Linux 服务器能够 ping 默认网关。 我已将 Linux eth1 值设置为 IPADDR="10.0.2.50
我想将我的本地 mysql 数据库迁移到 Amazon RDS。但首先我想测试它是否正在接收通信。所以我尝试ping它。但是尝试超时。 ping -c 5 myfishdb.blackOut.us-w
我对 AWS 很陌生,已经测试过启动一个实例,如下所示: 下面是安全组,附加了inbound规则 我的问题是我无法 ping 通这台服务器。我可以知道我是否理解错了什么吗? 最佳答案 您需要为其创建新
我对 AWS 很陌生,已经测试过启动一个实例,如下所示: 下面是安全组,附加了inbound规则 我的问题是我无法 ping 通这台服务器。我可以知道我是否理解错了什么吗? 最佳答案 您需要为其创建新
如何确定 IP 地址是否可 ping 通?另外,如何使用 perl 脚本找到可 ping 的 IP 是静态的还是动态的? 最佳答案 看看 Net::Ping模块; #!/usr/bin/env per
我已经研究这个有一段时间了。对于网站 static.etreeblog.com,如果网站离线,我想更改 duv 的类。 我研究过的方法: - 使用带有图像的 onerror 标签来运行函数。-问题:我
我正在使用 OpenvSwitch-2.5.2 在两个虚拟机上设置第 2 层网络,如上图所示。 在阅读了 ovs 官方教程和其他一些文章后,我在每个虚拟机上尝试了以下命令: # on vm1 ip l
我有一个名为 backend 的 Docker 容器,它公开了一个端口 8200,并在其中的 gunicorn 后面运行了一个 django 服务器。这是我的 Dockerfile: FROM deb
我是一名优秀的程序员,十分优秀!