- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
package com.yolo.springbootrabbitmqproducer.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
public class RabbitMQConfiguration {
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
String rabbitmqHost = "127.0.0.1";
String rabbitmqPort = "5672";
String rabbitmqUsername = "guest";
String rabbitmqPassword = "guest";
String rabbitmqVirtualHost = "/";
connectionFactory.setHost(rabbitmqHost);
connectionFactory.setPort(Integer.parseInt(rabbitmqPort));
connectionFactory.setUsername(rabbitmqUsername);
connectionFactory.setPassword(rabbitmqPassword);
connectionFactory.setVirtualHost(rabbitmqVirtualHost);
// connectionFactory.setPublisherReturns(true);//开启return模式
// connectionFactory.setPublisherConfirms(true);//开启confirm模式
return connectionFactory;
}
@Bean(name = "rabbitTemplate")
//必须是prototype类型
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(rabbitConnectionFactory());
}
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(1); //设置线程数
factory.setMaxConcurrentConsumers(1); //最大线程数
// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置为手动确认
configurer.configure(factory, connectionFactory);
return factory;
}
}
@RestController
public class ProducerTestOneController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
for (int i = 1; i <= 10; i++) {
String msg = message + " ..." + i;
System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
rabbitTemplate.convertAndSend("helloWorldExchange","helloWorld", msg);
}
}
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "helloWorld"
)
})
public void receive(String message) {
System.out.println(" [ 消费者 ] Received ==> '" + message + "'");
}
}
Ready:表示待消费数量;队列中拥有可以被消费者消费的消息数量。
Unacked:表示待确认数量;队列分配消息给消费者时,给该条消息一个待确认状态,当消费者确认消息之后,队列才会移除该条消息。
Total:表示待消费数和待确认数的总和
访问:http://localhost:8080/send
这里采用的是自动ack机制
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "helloWorld"
)
})
public void receive2(Message message) {
System.out.println(" [ 消费者@2 ] Received ==> '" + new String(message.getBody()) + "'");
}
重新启动,访问:localhost:8080/send
可以看到消息被平均消费了
队列的消息分配方式默认是平均分配,即第一条消息分配给一个消息者,第二条消息就分配给另一个消息者,以此类推…
上面示例有2个消费者监听,由于只是简单的打印语句,所以看不出有什么问题。
我进行修改一下,通过设置线程休眠时间来表示消费者处理消费的任务时间
@RestController
public class ProducerTestOneController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
for (int i = 1; i <= 10; i++) {
String msg = message + " ..." + i;
System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
rabbitTemplate.convertAndSend("helloWorldExchange","helloWorld", msg);
}
}
private int count1=1;
private int count2=1;
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "helloWorld"
)
})
public void receive(Message message) throws InterruptedException {
Thread.sleep(200);
System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
}
@RabbitListener(containerFactory = "customContainerFactory",bindings = {@QueueBinding(
value = @Queue(value = "helloWorldQueue",declare = "true"//指定一下队列名,默认持久队列,不指定则为临时队列
),
exchange = @Exchange(value = "helloWorldExchange",type = ExchangeTypes.DIRECT,durable = "true"),//Exchang的默认类型就是direct,所以type可以不写
key = "helloWorld"
)
})
public void receive2(Message message) throws InterruptedException {
Thread.sleep(1000);
System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
}
}
现在就能很明显的看出,消费者1号很快地处理完消息后就处于空闲状态;而消费者2号却一直很忙碌。当消息数量成千上万的时候,由消费者2号处理的消息会堆积很多,达不到时效性。
针对这种问题,rabbitmq提供了一种解决方案。
设置prefetch参数=1,实现原理是:队列只会分配一条消息给对应的监听消费者,收到消费者的确认回复之后才会重新分配另一条消息。
这里需要每一个接受者指定containerFactory
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(1); //设置线程数
factory.setMaxConcurrentConsumers(1); //最大线程数
// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置为手动确认
// 设置prefetch
factory.setPrefetchCount(1);
configurer.configure(factory, connectionFactory);
return factory;
}
1号处理了8条消息,2号2条,工作效率提高了不少
这里接受者不需要指定containerFactory
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
listener:
simple:
prefetch: 1
如何更改循环中变量的名称?比如 number1 、 number2 、 number3 、 number4 ? var array = [2,4,6,8] func ap ( number1: Int
我想设置 View 的背景颜色并在一定延迟后将其更改为另一种颜色。这是我的尝试方式: print("setting color 1") self.view.backgroundColor = UICo
我在使用 express-session 时遇到问题。 session 数据不会在请求之间持续存在。 正如您在下面的代码中看到的那样,/join 路由设置了一些 session 属性,但是当 /sur
我试图从叶渲染器获得一个非常简单的结果,用于快速 Steam 的 for 循环。 我正在上传叶文件 HTML,因为它不接受此处格式正确的代码 - 下面的pizza.swift代码- import
你们中有人有什么好的链接可以与我分享吗?我正在寻找一个 FAST 程序员编辑器,它可以非常快速地打开包含超过 100, 000 行代码的文件?我目前正在使用记事本自动取款机,打开一个 29000 行长
我现在正在处理眼动追踪数据,因此拥有一个巨大的数据集(想想数百万行),因此希望有一种快速的方法来完成此任务。这是它的简化版本。 数据告诉您眼睛在每个时间点正在查看的位置以及我们正在查看的每个文件。 X
我是新手,想为计时器或其他设备选择提示音。 如何打开此列表,以选择其中一种声音? Alert sound list 最佳答案 您将无法在应用中使用系统声音。 但是,您可以包括自己的声音文件,并将其显示
我编写了以下代码来构建具有顺序字符串的数组。 它的工作方式与我预期的一样,但我希望它能更快地运行。有没有更有效的方法在PowerShell中产生我想要的结果? 我是PowerShell的新手,非常感谢
我有一个包含一些非唯一行的矩阵,例如: x 尝试 y <- rle(apply(x, 1, paste, collapse = " ")) # y$lengths is the vector con
我的函数“keyboardWillShown”有问题。所以我想要的是菜单打开时,菜单正好出现在键盘上方。它可以在Iphone 8 plus,8、7、6上完美运行。但是,当我在模拟器上运行Iphone
我正在尝试通过Swift 5中的HTTP get方法从API提取数据。它在启动时成功加载了数据,但是当我刷新页面时,它说“索引超出范围”,这是因为数据是不再会在我的日志中读取,因此索引中没有任何内容。
我想做什么: 从我的数据库中获取时间戳并将其转换为用户的时区。 我的代码: let tryItNow = "\(model.timestampName)" let format = D
给定字体名称和字体大小,如何查找字符串的宽度(CGFloat)? (目标是将UIView的宽度设置为足以容纳字符串的宽度。) 我有两个字符串:一个重复“1”,重复36次,另一个重复“M”,重复36次。
我正在尝试解析此JSON ["Items": ( { AccountBalance = 0; AlphabetType = 3; Description = "\U0631\U
我在UINavigationBar内放置了一个UILabel。 我想根据navigationBar的高度增加该标签的字体大小。当navigationBar很大时,我希望字体大小更大;当滚动并缩小nav
我想将用户输入限制为仅有效数字并使用以下内容: func textView(_ textView: UITextView, shouldChangeTextIn range: NSRange, rep
目前我有一个包含超过 100.000 张图像的数据库,它们大小不一或类似,但我想为我的公司制作以下内容: 我插入/上传一张图片,系统返回最有可能相同的图片。我不知道使用什么算法,但它需要快速。我可以预
在我的 swift 项目中,我有一个按钮,我想在标签上打印按下该按钮的时间。 如何解决这个问题? 最佳答案 添加到DHEERAJ的答案中,您只需在func press(sender: UIButton
我必须发表评论,尝试在解析中导入数组。然而,有一个问题。 当我尝试从 Parse 加载数组时,我的输出是 ("Blah","Blah","Blah")这是一个元组...而不是一个数组 TT... 如何
我的应用程序有一个名为 MyDevice 的类,我用它来与硬件通信。该硬件是可选的,实例变量也是可选的: var theDevice:MyDevice = nil 然后,在应用程序中,我必须初始化设备
我是一名优秀的程序员,十分优秀!