- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我正在使用 springs SimpleMessageListenerContainer 来消费来自 RabbitMQ 队列的消息。一切正常,但是当向队列发送无效消息(例如无效的 json)时,监听器中止,正在关闭工作人员并且不接受任何进一步的消息。
是否可以将其配置为丢弃损坏的消息并继续收听更多消息?
我正在使用 sprint-rabbit-1.6.1.RELEASE.jar
我的配置如下所示:
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
MessageConverter messageConverter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("my.queue");
container.setMessageListener(listenerAdapter);
container.setMessageConverter(messageConverter);
return container;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
MessageListenerAdapter listenerAdapter(Worker worker) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage");
messageListenerAdapter.setMessageConverter(new Jackson2JsonMessageConverter());
return messageListenerAdapter;
}
我的监听器方法的声明:
public void processMessage(Map<String, String> message) {
当我发送类似 '"routeId":"7"}'
(损坏的 json)的消息时,我得到异常:
2016-09-02 08:10:35.821 WARN 35841 --- [ container-29] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) [spring-rabbit-1.6.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101]
at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 12 common frames omitted
2016-09-02 08:10:35.828 ERROR 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal exception during processing
org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException: Invalid listener
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1351) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 1 common frames omitted
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101]
at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 12 common frames omitted
2016-09-02 08:10:35.833 ERROR 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2016-09-02 08:10:35.833 INFO 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2016-09-02 08:10:35.833 INFO 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
这里抛出了SimpleMessageListenerContainer中的fatal Exception:
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
因此,如果容器配置了不存在的方法,它似乎应该关闭。但是如果消息损坏,它会尝试使用错误的参数类型调用方法,这也会导致 NoSuchMethodException。这意味着任何生产者都可以用一条损坏的消息杀死我的消费者。
感谢您的任何建议!
最佳答案
有趣;我能够重现您的问题;事实证明,如果消息不包含 __TypeID__
header (转换提示),它只会将“错误”json 作为字符串返回。
我能够通过将自定义类映射器注入(inject)转换器来解决它。
您还可以让发送系统设置类型 header 。
然后,消息被拒绝,因为我们得到一个 MessageConversionException
。
package com.example;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class So39264965Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So39264965Application.class, args);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend("my.queue", new Foo());
context.getBean(Worker.class).latch.await(60, TimeUnit.SECONDS);
// bad json
template.setMessageConverter(new SimpleMessageConverter());
template.convertAndSend("", "my.queue", "\"routeId\":\"7\"}", m -> {
m.getMessageProperties().setContentType("application/json");
return m;
});
Thread.sleep(60000);
context.close();
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter, MessageConverter messageConverter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("my.queue");
container.setMessageListener(listenerAdapter);
container.setMessageConverter(messageConverter);
return container;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public Queue queue() {
return new Queue("my.queue");
}
@Bean
public MessageConverter messageConverter() {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
jackson2JsonMessageConverter.setClassMapper(new ClassMapper() {
@Override
public Class<?> toClass(MessageProperties properties) {
return Foo.class;
}
@Override
public void fromClass(Class<?> clazz, MessageProperties properties) {
}
});
return jackson2JsonMessageConverter;
}
@Bean
MessageListenerAdapter listenerAdapter(Worker worker) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage");
messageListenerAdapter.setMessageConverter(messageConverter());
return messageListenerAdapter;
}
@Bean
public Worker worker() {
return new Worker();
}
public static class Worker {
private final CountDownLatch latch = new CountDownLatch(1);
public void processMessage(Foo foo) {
System.out.println(foo);
this.latch.countDown();
}
}
public static class Foo {
private String bar = "bar";
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
关于java - RabbitMQ 的 Spring SimpleMessageListenerContainer 因无效消息而中止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39264965/
我有一个接受以下参数的函数: int setvalue(void (*)(void *)); 为了满足参数:void (*)(void *),我创建了这样一个函数: static void *
我有以下代码: typedef void VOID; int f(void); int g(VOID); 在 C 中编译得很好(在 Fedora 10 上使用 gcc 4.3.2)。与 C++ 编译的
这个问题已经有答案了: Is f(void) deprecated in modern C and C++? [duplicate] (6 个回答) 已关闭 7 年前。 B.A.T.M.A.N./A.
我在 ASP.NET Core 3.1 项目上有以下 Identity Server 4 配置: services .AddIdentityServer(y => { y.Events.R
我们有一个 O365 租户,一切都是开箱即用的。租户放置在德国云中,而不是全局 (office.de) 中。我们还开发了一个 Office 插件,使用 OAuth 2.0 授权访问共享点。首先,我们向
我有一个如下所示的路由 routes.MapRoute( name: "Default", url: "{controller}/{action}/{i
我正在尝试使用 OAuth2.0 访问 google 文档。我已经从 Google API 控制台获取了客户端 ID 和 key 。但是当我运行这段代码时,我收到了异常。如果我遗漏了什么,有人可以建议
此代码有效: let mut b: Vec = Vec::with_capacity(a.len()); for val in a.iter() { b.push(val); } 此代码不起作
使用 client_credintials 授权类型请求 EWS oauth2 v2.0 的访问 token 时出现错误。 https://login.microsoftonline.com/tena
我通过 Java 应用程序使用 Google 电子表格时遇到了问题。我创建了应用程序,该应用程序运行了 1 年多,没有任何问题,我什至在 Create Spreadsheet using Google
如何创建 匹配所有无效 Base64 字符的正则表达式?我在堆栈上找到了 [^a-zA-Z0-9+/=\n\r].*$ 但是当我尝试时我得到了带有 - 符号的结果字符串.我根本不知道正则表达式,任何人
我从 Gitlab CI/CD Pipelines 获得错误信息:yaml invalid。问题是由 .gitlab-ci.yml 脚本的第五行引起的: - 'ssh deployer@gita
我有 3 个数据源,设置如下: @Configuration @Component public class DataSourceConfig { @Bean("foo") @Conf
你好,我想用bulkCreate ex 插入数据: [ { "typeId": 5, "devEui": "0094E796CBFCFEF9", "application_name": "Pressu
UIApplicationExitsOnSuspend 不会强制我的应用程序退出。我已经清理过目标、删除了应用程序、重建并重新安装了很多次。 我确实需要退出我的应用程序。 最佳答案 您是否链接了 SD
在 iPhone 配置门户上,显示我的 iPhone 团队配置配置文件无效。有一个“由 Xcode 管理”文本。 “续订”按钮被禁用。 我该如何解决这个问题?谢谢 最佳答案 使用 Xcode 3.2.
好的,所以今天我用我们的“实时”数据库中的新信息更新了我的数据库……从那时起,我的一个表格就出现了问题。如果您需要任何代码,请告诉我,我将对其进行编辑并发布所需的代码... 我有一个报告表格,其中有一
我有一个结构体,其中有一个元素表示为 void (*func)(); 我知道 void 指针通常用于函数指针,但我似乎无法定义该函数。我不断收到取消引用指向不完整类型的指针。我用谷歌搜索了一下但没有结
我正在尝试使用 Coldfusion 9 从 ning 网络获取凭证,所以首先这是测试 api 的 curl 语法: curl -k https://external.ningapis.com/xn/
这个问题已经有答案了: Does C have references? (2 个回答) 已关闭 4 年前。 我正在学习 C 语言引用,这是我的代码: #include int main(void)
我是一名优秀的程序员,十分优秀!