- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我目前正在 Kotlin 开发一个项目,该项目使用带有反应器的 rabbit 来接收某种 DTO 类型的消息,并在它们达到特定标准时发送它们。在测试我的代码的过程中,我尝试模拟错误的消息输入(因为消息来自外部服务)并查看订阅者的行为。一旦我收到错误的输入消息,订阅者就会停止收听任何新输入并抛出以下异常:
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage
然后我尝试运行
official example从 Spring 开始并更改供应商以在第一次调度时发送错误数据,然后发送有效数据并查看行为。
//Following source and sinks are used for testing only.
//Test source will send data to the same destination where the processor receives data
//Test sink will consume data from the same destination where the processor produces data
// ------ New Code -------
static int x = 0;
// ------ END New Code -------
static class TestSource {
private AtomicBoolean semaphore = new AtomicBoolean(true);
private Random random = new Random();
private int[] ids = new int[]{100100, 100200, 100300};
@Bean
public Supplier<?> sendTestData() {
return () -> {
// ------ New Code -------
if(x==0) {
return "hey";
}
x++;
// ------ END New Code -------
int id = ids[random.nextInt(3)];
int temperature = random.nextInt((102 - 65) + 1) + 65;
Sensor sensor = new Sensor();
sensor.setId(id);
sensor.setTemperature(temperature);
return sensor;
};
}
}
订阅方:
@Bean
public Function<Flux<Sensor>, Flux<Average>> calculateAverage() {
return data -> data.window(Duration.ofSeconds(3)).flatMap(
window -> window.groupBy(Sensor::getId).flatMap(this::calculateAverage));
}
private Mono<Average> calculateAverage(GroupedFlux<Integer, Sensor> group) {
return group
.reduce(new Accumulator(0, 0),
(a, d) -> new Accumulator(a.getCount() + 1, a.getTotalValue() + d.getTemperature()))
.map(accumulator -> new Average(group.key(), (accumulator.getTotalValue()) / accumulator.getCount()));
}
正如我所怀疑的那样,结果表明订户在输入错误后无法继续处理下一条有效消息:
2021-02-21 17:46:57.905 INFO 30702 --- [lOCpiq_lPYTgA-1] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.calculateAverage-in-0' has 0 subscriber(s).
2021-02-21 17:46:57.907 ERROR 30702 --- [lOCpiq_lPYTgA-1] onfiguration$FunctionToDestinationBinder : Failure was detected during execution of the reactive function 'calculateAverage'
2021-02-21 17:46:57.910 ERROR 30702 --- [lOCpiq_lPYTgA-1] reactor.core.publisher.Operators : Operator called default onErrorDropped
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'hey': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"hey"; line: 1, column: 4]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hey': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"hey"; line: 1, column: 4]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:235) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:110) ~[spring-cloud-stream-3.0.11.RELEASE.jar:3.0.11.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:197) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:70) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.fromMessage(SimpleFunctionRegistry.java:932) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputValueIfNecessary(SimpleFunctionRegistry.java:833) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$convertInputPublisherIfNecessary$9(SimpleFunctionRegistry.java:772) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:432) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:274) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-02-21 17:47:00.917 ERROR 30702 --- [lOCpiq_lPYTgA-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=4, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:57 IST 2021, amqp_messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, id=6493e09a-4b4d-37f9-615d-24faa3f3b2f2, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:57 IST 2021, messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=4, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922417903}], failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=4, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:57 IST 2021, amqp_messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, id=6493e09a-4b4d-37f9-615d-24faa3f3b2f2, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:57 IST 2021, messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=4, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922417903}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=4, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:57 IST 2021, amqp_messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, id=6493e09a-4b4d-37f9-615d-24faa3f3b2f2, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:57 IST 2021, messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=4, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922417903}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 23 more
2021-02-21 17:47:03.507 INFO 30702 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2021-02-21 17:47:03.936 ERROR 30702 --- [lOCpiq_lPYTgA-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=5, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:58 IST 2021, amqp_messageId=b67652f5-1842-4c13-596d-295b36002217, id=4f2a90f9-5aae-0fae-507d-787694f605dc, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:58 IST 2021, messageId=b67652f5-1842-4c13-596d-295b36002217, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=5, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922420930}], failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=5, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:58 IST 2021, amqp_messageId=b67652f5-1842-4c13-596d-295b36002217, id=4f2a90f9-5aae-0fae-507d-787694f605dc, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:58 IST 2021, messageId=b67652f5-1842-4c13-596d-295b36002217, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=5, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922420930}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=5, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:58 IST 2021, amqp_messageId=b67652f5-1842-4c13-596d-295b36002217, id=4f2a90f9-5aae-0fae-507d-787694f605dc, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:58 IST 2021, messageId=b67652f5-1842-4c13-596d-295b36002217, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=5, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922420930}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 23 more
我的问题是如何处理“幕后”发生的异常,例如输入解析?
最佳答案
鉴于响应式(Reactive)编程的性质,这是一个棘手的问题,所以我们可能需要将此讨论变成一个问题,所以请随意 to raise one ,但这是我的看法。
响应式函数和命令式函数之间的根本区别在于工作单元的概念。对于命令式函数,工作单元是单个消息,因此框架保持对流的持续控制,仅通过消息将其传递给函数。因此,您会期望并且理所当然地,无论错误发生在哪里,我们都会有一些用于错误处理的东西——我们确实做到了。
使用响应式函数,世界完全改变了,因为工作单元是整个流,而函数仅充当框架提供的流和用户定义的流操作之间的连接器。在这一点上,s-c-stream 无法控制用户的操作,所以我们的一般建议,特别是考虑到响应式(Reactive) API 在错误处理方面的丰富性,是让用户自己处理。但要理解这不是因为我们不想,而是我们不能,因为那时我们对流没有任何看法。
您的问题确实很独特,因为在执行您定义的步骤之前发生异常,特别是我们提供的类型转换。事实上,我们可以做一些事情来帮助解决这个问题,但我们仍在就这些事情应该是什么寻求共识,直到我们做到快速失败才是解决方案。您可以通过修复输入来克服它,因为它显然不是 JSON 和/或放宽您的函数签名 Function<Flux<byte[]>, Flux<Average>>
并自己处理类型转换。
无论如何,正如您所看到的,我愿意接受建议,因此请随时提出和发布并提供您的意见。
关于kotlin - Spring Cloud Stream Reactive - 处理消息异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66304153/
类型‘AbstractControl’上不存在属性‘Controls’。
主要是我很好奇。 我们有一个名为 Unit 的对象在我们的代码库中 - 代表桥梁或道路的组件。在我们的例子中,看到带有 Unit 的 ReactiveUI 命令可能会模棱两可。作为声明中的泛型之一。
我一直听说六边形架构必须与任何框架无关,并使用接口(interface) (SPI) 来委托(delegate)不属于业务层的每个代码部分。 但是如何在不使用额外框架的情况下通过六边形架构创建一个响应
我读了 Reactive Manifesto . 但我无法理解 event driven architectures 之间的核心差异和 message driven architectures .结果
申请要求: 订阅两个事件流 A 和 B 对于每个 A 事件,一段时间后应该有相应的 B 事件 如果没有相应的 B 到达(及时),应用程序会监视 A 事件并发出警报 B 事件可以以与 A 事件不同的顺序
Closed. This question is opinion-based。它当前不接受答案。 想改善这个问题吗?更新问题,以便editing this post用事实和引用来回答。 4年前关闭。
我有一个 ViewModel,它在其初始化程序中有一个输入 init(sliderEvents: Reactive) { 在测试中我想做类似的事情 slider.send(.touchDownInsi
经典实时搜索示例: var searchResults = from input in textBoxChanged from results in GetDa
我有一个响应式(Reactive)管道来处理传入的请求。对于每个请求,我需要调用一个与业务相关的函数 ( doSomeRelevantProcessing )。 完成后,我需要将发生的事情通知一些外部
是否可以为响应式扩展实现基于硬件计时器的自定义调度程序?我该如何开始,有什么好的例子吗? 我有一个硬件可以每毫秒向我发送一个准确的中断。我想利用它来创建更精确的 RX 调度程序。 更新 感谢 Asti
我正在通过网络浏览 Rx 框架 Material ,我发现了很多。 现在,每当我为此在 google 上搜索时,我还会在 wikipedia 链接中找到“响应式(Reactive)编程”。 由于响应式
关闭。这个问题是opinion-based .它目前不接受答案。 想改进这个问题?更新问题,以便 editing this post 可以用事实和引用来回答它. 6年前关闭。 Improve this
SignalR 与响应式扩展是同一回事吗?你能解释一下为什么或为什么不吗? 最佳答案 不,它们绝对不是同一件事。 Reactive Extensions 是一个用于创建和组合可观察的数据流或事件流的库
我知道有一种简单的方法可以做到这一点 - 但今晚它打败了我...... 我想知道两个事件是否在 300 毫秒内发生,就像双击一样。 在 300 毫秒内单击两次左键鼠标 - 我知道这是构建响应式(Rea
我们的应用程序使用 Reactive Extensions (Rx)。这些通常通过 Microsoft 的可下载包安装。但是,当我们发布应用程序时,我们会提供 dll 的副本(即 System.Cor
我想了解 Reactive 和 ReactiveStreams 之间的区别,特别是在 RxJava 的上下文中? 我能想到的最多的是 Reactive Streams 在规范中有一些背压的概念,但它已
我想探索来自 Quarkus 的响应式 REST 客户端的慢速后端,并在他们建议的样本 (https://github.com/quarkusio/quarkus-quickstarts/tree/m
假设我有一个存储桶,我需要从中获取日期早于现在的文档。 该文档如下所示: { id: "1", date: "Some date", otherObjectKEY: "key1" } 对于每个文档,我
我有一个 RIA 服务数据服务,它有几个函数调用,如下所示: public InvokeOperation SomeFunc( SomeData data, Action> callb
我一直在使用 Rx 在单个应用程序中创建事件总线(想想 CQRS/ES),它似乎工作得很好。然而,在调查了一堆不同的事件溯源框架之后,我还没有看到使用过一次 Rx。与基于反射/容器的调度程序相比,它似
我是一名优秀的程序员,十分优秀!