gpt4 book ai didi

kotlin - Spring Cloud Stream Reactive - 处理消息异常

转载 作者:行者123 更新时间:2023-12-04 02:27:41 27 4
gpt4 key购买 nike

我目前正在 Kotlin 开发一个项目,该项目使用带有反应器的 rabbit 来接收某种 DTO 类型的消息,并在它们达到特定标准时发送它们。在测试我的代码的过程中,我尝试模拟错误的消息输入(因为消息来自外部服务)并查看订阅者的行为。一旦我收到错误的输入消息,订阅者就会停止收听任何新输入并抛出以下异常:

 org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage
然后我尝试运行 official example从 Spring 开始并更改供应商以在第一次调度时发送错误数据,然后发送有效数据并查看行为。
在供应商方面,我添加了一个索引器,仅在第一次运行时发送错误消息。
//Following source and sinks are used for testing only.
//Test source will send data to the same destination where the processor receives data
//Test sink will consume data from the same destination where the processor produces data
// ------ New Code -------
static int x = 0;
// ------ END New Code -------

static class TestSource {

private AtomicBoolean semaphore = new AtomicBoolean(true);
private Random random = new Random();
private int[] ids = new int[]{100100, 100200, 100300};

@Bean
public Supplier<?> sendTestData() {

return () -> {
// ------ New Code -------
if(x==0) {
return "hey";
}
x++;
// ------ END New Code -------
int id = ids[random.nextInt(3)];
int temperature = random.nextInt((102 - 65) + 1) + 65;
Sensor sensor = new Sensor();
sensor.setId(id);
sensor.setTemperature(temperature);
return sensor;
};
}
}
订阅方:
@Bean
public Function<Flux<Sensor>, Flux<Average>> calculateAverage() {
return data -> data.window(Duration.ofSeconds(3)).flatMap(
window -> window.groupBy(Sensor::getId).flatMap(this::calculateAverage));
}

private Mono<Average> calculateAverage(GroupedFlux<Integer, Sensor> group) {
return group
.reduce(new Accumulator(0, 0),
(a, d) -> new Accumulator(a.getCount() + 1, a.getTotalValue() + d.getTemperature()))
.map(accumulator -> new Average(group.key(), (accumulator.getTotalValue()) / accumulator.getCount()));
}
正如我所怀疑的那样,结果表明订户在输入错误后无法继续处理下一条有效消息:
2021-02-21 17:46:57.905  INFO 30702 --- [lOCpiq_lPYTgA-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.calculateAverage-in-0' has 0 subscriber(s).
2021-02-21 17:46:57.907 ERROR 30702 --- [lOCpiq_lPYTgA-1] onfiguration$FunctionToDestinationBinder : Failure was detected during execution of the reactive function 'calculateAverage'
2021-02-21 17:46:57.910 ERROR 30702 --- [lOCpiq_lPYTgA-1] reactor.core.publisher.Operators : Operator called default onErrorDropped

org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'hey': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"hey"; line: 1, column: 4]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hey': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"hey"; line: 1, column: 4]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:235) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:110) ~[spring-cloud-stream-3.0.11.RELEASE.jar:3.0.11.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:197) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:70) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.fromMessage(SimpleFunctionRegistry.java:932) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputValueIfNecessary(SimpleFunctionRegistry.java:833) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$convertInputPublisherIfNecessary$9(SimpleFunctionRegistry.java:772) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:432) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:274) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-02-21 17:47:00.917 ERROR 30702 --- [lOCpiq_lPYTgA-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=4, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:57 IST 2021, amqp_messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, id=6493e09a-4b4d-37f9-615d-24faa3f3b2f2, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:57 IST 2021, messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=4, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922417903}], failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=4, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:57 IST 2021, amqp_messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, id=6493e09a-4b4d-37f9-615d-24faa3f3b2f2, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:57 IST 2021, messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=4, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922417903}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=4, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:57 IST 2021, amqp_messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, id=6493e09a-4b4d-37f9-615d-24faa3f3b2f2, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:57 IST 2021, messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=4, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922417903}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 23 more

2021-02-21 17:47:03.507 INFO 30702 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2021-02-21 17:47:03.936 ERROR 30702 --- [lOCpiq_lPYTgA-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=5, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:58 IST 2021, amqp_messageId=b67652f5-1842-4c13-596d-295b36002217, id=4f2a90f9-5aae-0fae-507d-787694f605dc, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:58 IST 2021, messageId=b67652f5-1842-4c13-596d-295b36002217, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=5, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922420930}], failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=5, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:58 IST 2021, amqp_messageId=b67652f5-1842-4c13-596d-295b36002217, id=4f2a90f9-5aae-0fae-507d-787694f605dc, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:58 IST 2021, messageId=b67652f5-1842-4c13-596d-295b36002217, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=5, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922420930}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=5, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:58 IST 2021, amqp_messageId=b67652f5-1842-4c13-596d-295b36002217, id=4f2a90f9-5aae-0fae-507d-787694f605dc, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:58 IST 2021, messageId=b67652f5-1842-4c13-596d-295b36002217, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=5, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922420930}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 23 more
我的问题是如何处理“幕后”发生的异常,例如输入解析?

最佳答案

鉴于响应式(Reactive)编程的性质,这是一个棘手的问题,所以我们可能需要将此讨论变成一个问题,所以请随意 to raise one ,但这是我的看法。
响应式函数和命令式函数之间的根本区别在于工作单元的概念。对于命令式函数,工作单元是单个消息,因此框架保持对流的持续控制,仅通过消息将其传递给函数。因此,您会期望并且理所当然地,无论错误发生在哪里,我们都会有一些用于错误处理的东西——我们确实做到了。
使用响应式函数,世界完全改变了,因为工作单元是整个流,而函数仅充当框架提供的流和用户定义的流操作之间的连接器。在这一点上,s-c-stream 无法控制用户的操作,所以我们的一般建议,特别是考虑到响应式(Reactive) API 在错误处理方面的丰富性,是让用户自己处理。但要理解这不是因为我们不想,而是我们不能,因为那时我们对流没有任何看法。
您的问题确实很独特,因为在执行您定义的步骤之前发生异常,特别是我们提供的类型转换。事实上,我们可以做一些事情来帮助解决这个问题,但我们仍在就这些事情应该是什么寻求共识,直到我们做到快速失败才是解决方案。您可以通过修复输入来克服它,因为它显然不是 JSON 和/或放宽您的函数签名 Function<Flux<byte[]>, Flux<Average>>并自己处理类型转换。
无论如何,正如您所看到的,我愿意接受建议,因此请随时提出和发布并提供您的意见。

关于kotlin - Spring Cloud Stream Reactive - 处理消息异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66304153/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com