- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在开发一个应用程序,它接收按 session ID 分组的其余消息( session 1 可以由 2 条消息组成, session 2 可以由 10 条消息组成)并将它们发送到数据库。给定 session 的消息内部具有相同的 session ID。
对于给定的 session ,第一条消息应首先发送到数据库,然后是第二条,依此类推。 session 中的顺序非常重要。
session 的顺序并不重要,我们可以混合它们的消息,例如我们可以按以下顺序将消息发送到数据库:
我创建了 10 个rabbitmq 队列。应用程序根据 session ID 选择队列:来自给定 session 的所有消息都位于同一队列中。
每个队列有 1 个消费者,因此保证同一队列中的顺序。
出于性能原因(以及流量增长),我们必须将队列数量设置得更高(节点创建 100 个队列)或部署应用程序的其他实例(10 个节点,每个队列上有 1 个使用者 - 因此 10 个使用者每个队列)。
设置更高的队列数量并不困难,但我的方法有点难看并且有代码重复(见下文)。我需要建议来让它变得更好(并且当天我们需要 1000 个队列)。
如果我们部署 10 个节点而不是 1 个,每个队列将有 10 个消费者,并且队列中消息的顺序将无法保证(因此 session A 中的消息 2 可能会在 session A 中的消息 1 之前发送到数据库) )。
首选解决方案是 10 个节点,因为我们可以使其动态化,并且可以在需要时启动/停止 docker 中的节点。
这是我使用的依赖项:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.6.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.3.RELEASE</version>
</dependency>
这是兔子配置:
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setPrefetchCount(50);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
@Bean
public ConnectionFactory connectionFactory() {
String addresses = "address1,address2";
com.rabbitmq.client.ConnectionFactory rabbitConnection = new com.rabbitmq.client.ConnectionFactory();
rabbitConnection.setAutomaticRecoveryEnabled(true);
rabbitConnection.setUsername("username");
rabbitConnection.setPassword("password");
rabbitConnection.setVirtualHost("virtualHost");
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnection);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setAddresses(addresses);
connectionFactory.setChannelCacheSize(100);
return connectionFactory;
}
目前,我有 10 个队列,由 10 个类创建。这是一个队列示例:
@Component
@RabbitListener(containerFactory = "myRabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "queue2", durable = "true"), exchange = @Exchange(type = "topic", value = "exchange2", durable = "true"), key = "key2"))
public class QueueGroup2Listener {
@RabbitHandler
public void processOrder(RequestMessage received) throws DataAccessResourceFailureException {
process(received);
}
}
我没有找到比在注释中使用不同值(从 1 到 10)创建 10 次此类更好的方法。
问题是:如何在队列中添加消费者并保证给定 session 中消息的顺序?我的意思是队列中有 10 个消费者。消费者 A 消费来自 session A 的消息 1,因此其他消费者不应消费来自 session A 的其他消息。
额外问题是:如何使队列创建优于每个队列 1 个类?
非常感谢
更新
这个问题的答案对我有很大帮助 RabbitMQ : Create Dynamic queues in Direct Exchange :我可以为每个 session 创建一个队列(在这种情况下,下一个问题是rabbitmq可以同时管理多少个队列?)
在加里回答后更新
感谢您的回复,我尝试了以下方法,但是应用程序启动消费者非常非常长:
@Bean
public QueueMessageListener listener() {
return new QueueMessageListener();
}
@Bean(name="exchange")
public Exchange exchange() {
TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
MessageListenerAdapter adapter = new MessageListenerAdapter(listener(), "processOrder");
container.setMessageListener(adapter);
admin().declareExchange(exchange);
createQueues(exchange, QUEUE, numberOfQueues, BINDING_KEY, container, null, true);
container.start(); // very very very long
return exchange;
}
private void createQueues(Exchange exchange, String queuePrefix, int numberOfQueues, String bindingPrefix,
SimpleMessageListenerContainer container, Map<String, Object> args) {
int length = 1;
if(numberOfQueues > 1) {
length = (int)(Math.log10(numberOfQueues - 1) + 1);
}
for (int i = 0; i < numberOfQueues; i++) {
Queue queue = new Queue(queuePrefix + String.format("%0" + length + "d", i), true, false, false, args);
container.addQueues(queue);
admin().declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(bindingPrefix + i).noargs();
admin().declareBinding(binding);
}
}
如果我不调用启动函数,则不会创建消费者。
最佳答案
您可以通过编程方式启动 SimpleMessageListenerContainer
,而不是使用声明性范例。
您还可以使用RabbitAdmin
以编程方式声明队列、绑定(bind)等。
由于 Spring AMQP 缓存 channel ,因此不能保证同一 channel 上会发生两次发送(这会导致顺序丢失的可能性非常小);为了确保顺序,您需要在即将发布的 2.0 版本中使用新的 RabbitTemplate.invoke()
方法。它将在同一 channel 上的调用范围内执行发送,从而保证顺序。
如果您的发送代码是单线程的,这不是问题,因为在这种情况下将始终使用相同的 channel 。
关于java - Spring和Rabbit mq消息顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46340403/
我必须用兔子、松鼠、狼和猎豹填充我的网格。我正在寻找字符串到类型对象的映射 populate("CHEETAH", 2); populate("WOLF", 3); populate("SQUIRRE
我们正在对业务异常进行重试操作,并使用 MessageRecoverer 进行几次尝试后存储消息,因此我们在 XML 中对重试进行了第一个配置,例如最大尝试次数和间隔等。在此链接中重试的属性 http
我目前正在研究 Rabbit-Mq,并试图实现一个“死信”队列,一个用于失败消息的队列。我一直在阅读兔子文档:https://www.rabbitmq.com/dlx.html . 并想出了这个例子:
我正在尝试使用 devstack 在 Ubuntu 12.04 上设置 OpenStack。现在,我得到的错误是: Setting up rabbitmq-server (2.7.1-0ubuntu4
我们有一个 RabbitMQ 交换器,它在我们系统的几个组件之间交换消息。 每个组件都是交易所的发布者和订阅者。 我们需要找到一种方法来确保每个应用程序都不会收到它发送到交换中的消息。 例如。 应用
我有一个不寻常的情况,如果我的应用程序在消息处理过程中已正常关闭(例如自动缩放),我不希望将带有重新传递标志的消息发送回队列。我希望仅在应用程序崩溃时才设置该标志。我的代码中有一个功能,可以以不同的方
我有 JRuby 代码: class Receiver def initialize(channel_id) @channel_id = channel_id factory =
Spring AMQP Reference说: Starting with version 1.3, the CachingConnectionFactory can be configured to
我以这种方式使用rabbitTemplate: localhost 发送至交易所: rabbitTemplate.setExch
从 this question 开始,我们有一个 Rabbit 凭证失效的场景,我们需要在我们的 CachingConnectionFactory 上调用 resetConnection() 来获取一
我有多个模块,它们通过消息队列 (Spring Rabbit) 相互通信。一些模块产生消息,而另一些模块使用它们。但是,单个模块可以监听不同的队列,我在列表中有一个队列名称列表,因此我为每个队列名称创
spring-rabbit 可以支持单个主题上的多个并发消费者吗? 详细信息如下 我的系统使用手动确认模式,通过 spring-rabbit (Spring 4.0.6) 进行主题交换。模式如下: 消
我想并行处理来自 rabbitMq 队列的消息。队列配置为 autoAck =false。我正在使用 camel-rabbitMQ 支持 camel endpoints ,它支持 threadPool
我正在开发一个支持 rabbitmq 的应用程序。所以,我有一个消费者和一个生产者。我需要在两种方式之间做出决定,如何在它们之间建立通信。 第一种方式 public void send(){ /
我有以下监听器方法: @Override public void onMessage(Message message, Channel channel) { try { // do som
如何在给其他消费者拒绝消息或一段时间后不回复后重复发送消息?不包括当前消费者? 最佳答案 对于 RabbitMQ,您可以使用 Acknowledgements .成功处理消息后,您的消费者将确认(确认
当我在交易所发布时收到 Nack 时,我在配置 ReturnCallback 时遇到问题。这是我所做的: CachingConnectionFactory connectionFactory = ne
我们使用 RabbitMQ 服务器在应用程序之间进行消息传递。我们需要为所有进入 Rabbit 服务器的 amqp 消息创建一个中央日志。我们的目的不是临时调试,而是可审计性。理想情况下,我可以先登录
RabbitMQ 似乎占用了太多磁盘空间并且无法启动。如何在我的 Mac 上删除它?我似乎找不到它。我已经尝试删除所有图像和容器,然后从头开始重建,希望它能解决问题。 $docker logs rab
我正在尝试进行rabbitmq http api调用,以了解队列的存在方式和其他信息... 我需要3个变量才能传递给api 1)网址:(http:// localhost:55672 / api)2)
我是一名优秀的程序员,十分优秀!