- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
消息队列(MQ)是分布式系统中不可或缺的技术之一.
对很多小伙伴来说,刚接触MQ时,可能觉得它只是个“传话工具”,但用着用着,你会发现它简直是系统的“润滑剂”.
无论是解耦、削峰,还是异步任务处理,都离不开MQ的身影.
下面我结合实际场景,从简单到复杂,逐一拆解MQ的10种经典使用方式,希望对你会有所帮助.
小伙伴们是不是经常遇到这样的情况:用户提交一个操作,比如下单,然后要发送短信通知.
如果直接在主流程里调用短信接口,一旦短信服务响应慢,就会拖累整个操作.
用户等得不耐烦,心态直接崩了.
用MQ,把非关键流程抽出来异步处理。下单时,直接把“发短信”这件事丢给MQ,订单服务就能立刻响应用户,而短信的事情让MQ和消费者去搞定.
// 订单服务:生产者
Order order = createOrder(); // 订单生成逻辑
rabbitTemplate.convertAndSend("order_exchange", "order_key", order);
System.out.println("订单已生成,发短信任务交给MQ");
// 短信服务:消费者
@RabbitListener(queues = "sms_queue")
public void sendSms(Order order) {
System.out.println("发送短信,订单ID:" + order.getId());
// 调用短信服务接口
}
这种方式的好处是:主流程解耦,不受慢服务的拖累。订单服务只管自己的事,短信服务挂了也没关系,MQ会把消息暂存,等短信服务恢复后继续处理.
每年的“双十一”电商大促,用户秒杀商品时一窝蜂冲进来.
突然涌入的高并发请求,不仅会压垮应用服务,还会直接让数据库“趴窝”.
秒杀请求先写入MQ,后端服务以稳定的速度从MQ中消费消息,处理订单.
这样既能避免系统被瞬时流量压垮,还能提升处理的平稳性.
// 用户提交秒杀请求:生产者
rabbitTemplate.convertAndSend("seckill_exchange", "seckill_key", userRequest);
System.out.println("用户秒杀请求已进入队列");
// 秒杀服务:消费者
@RabbitListener(queues = "seckill_queue")
public void processSeckill(UserRequest request) {
System.out.println("处理秒杀请求,用户ID:" + request.getUserId());
// 执行秒杀逻辑
}
MQ在这里相当于一个缓冲池,把瞬时流量均匀分布到一段时间内处理。系统稳定性提升,用户体验更好.
比如一个订单系统需要通知库存系统扣减库存,还要通知支付系统完成扣款.
如果直接用同步接口调用,服务间的依赖性很强,一个服务挂了,整个链条都会被拖垮.
订单服务只负责把消息丢到MQ里,库存服务和支付服务各自从MQ中消费消息.
这样订单服务不需要直接依赖它们.
// 订单服务:生产者
rabbitTemplate.convertAndSend("order_exchange", "order_key", order);
System.out.println("订单生成消息已发送");
// 库存服务:消费者
@RabbitListener(queues = "stock_queue")
public void updateStock(Order order) {
System.out.println("扣减库存,订单ID:" + order.getId());
}
// 支付服务:消费者
@RabbitListener(queues = "payment_queue")
public void processPayment(Order order) {
System.out.println("处理支付,订单ID:" + order.getId());
}
通过MQ,各个服务之间可以实现松耦合.
即使库存服务挂了,也不会影响订单生成的流程,大幅提升系统的容错能力.
订单服务需要同时生成订单和扣减库存,这涉及两个不同的数据库操作.
如果一个成功一个失败,就会导致数据不一致.
通过MQ实现分布式事务.
订单服务生成订单后,将扣减库存的任务交给MQ,最终实现数据的一致性.
// 订单服务:生产者
rabbitTemplate.convertAndSend("order_exchange", "order_key", order);
System.out.println("订单创建消息已发送");
// 库存服务:消费者
@RabbitListener(queues = "stock_queue")
public void updateStock(Order order) {
System.out.println("更新库存,订单ID:" + order.getId());
// 执行扣减库存逻辑
}
通过“最终一致性”解决了分布式事务的难题,虽然短时间内可能有数据不一致,但最终状态一定是正确的.
比如商品价格调整,库存、搜索、推荐服务都需要同步更新.
如果每个服务都要单独通知,工作量会很大.
MQ的广播模式(Fanout)可以让多个消费者订阅同一条消息,实现消息的“一发多收”.
// 生产者:广播消息
rabbitTemplate.convertAndSend("price_update_exchange", "", priceUpdate);
System.out.println("商品价格更新消息已广播");
// 消费者1:库存服务
@RabbitListener(queues = "stock_queue")
public void updateStockPrice(PriceUpdate priceUpdate) {
System.out.println("库存价格更新:" + priceUpdate.getProductId());
}
// 消费者2:搜索服务
@RabbitListener(queues = "search_queue")
public void updateSearchPrice(PriceUpdate priceUpdate) {
System.out.println("搜索价格更新:" + priceUpdate.getProductId());
}
这种模式让多个服务都能接收到同一条消息,扩展性非常强.
多个服务产生的日志需要统一存储和分析.
如果直接写数据库,可能导致性能瓶颈.
各服务将日志写入MQ,日志分析系统从MQ中消费消息并统一处理.
// 服务端:生产者
rabbitTemplate.convertAndSend("log_exchange", "log_key", logEntry);
System.out.println("日志已发送");
// 日志分析服务:消费者
@RabbitListener(queues = "log_queue")
public void processLog(LogEntry log) {
System.out.println("日志处理:" + log.getMessage());
// 存储或分析逻辑
}
用户下单后,如果30分钟内未支付,需要自动取消订单.
使用MQ的延迟队列功能,设置消息延迟消费的时间.
// 生产者:发送延迟消息
rabbitTemplate.convertAndSend("delay_exchange", "delay_key", order, message -> {
message.getMessageProperties().setDelay(30 * 60 * 1000); // 延迟30分钟
return message;
});
System.out.println("订单取消任务已设置");
// 消费者:处理延迟消息
@RabbitListener(queues = "delay_queue")
public void cancelOrder(Order order) {
System.out.println("取消订单:" + order.getId());
// 取消订单逻辑
}
在一个分布式系统中,多个服务依赖同一份数据源.
例如,电商平台的订单状态更新后,需要同步到缓存系统和推荐系统.
如果让每个服务直接从数据库拉取数据,会增加数据库压力,还可能出现延迟或不一致的问题.
利用MQ进行数据同步。订单服务更新订单状态后,将更新信息发送到MQ,缓存服务和推荐服务从MQ中消费消息并同步数据.
订单服务:生产者 。
// 更新订单状态后,将消息发送到MQ
Order order = updateOrderStatus(orderId, "PAID"); // 更新订单状态为已支付
rabbitTemplate.convertAndSend("order_exchange", "order_status_key", order);
System.out.println("订单状态更新消息已发送:" + order.getId());
缓存服务:消费者 。
@RabbitListener(queues = "cache_update_queue")
public void updateCache(Order order) {
System.out.println("更新缓存,订单ID:" + order.getId() + " 状态:" + order.getStatus());
// 更新缓存逻辑
cacheService.update(order.getId(), order.getStatus());
}
推荐服务:消费者 。
@RabbitListener(queues = "recommendation_queue")
public void updateRecommendation(Order order) {
System.out.println("更新推荐系统,订单ID:" + order.getId() + " 状态:" + order.getStatus());
// 更新推荐服务逻辑
recommendationService.updateOrderStatus(order);
}
通过MQ实现数据同步的好处是:
有些任务需要定时执行,比如每天凌晨清理过期订单.
这些订单可能分布在多个服务中,如果每个服务独立运行定时任务,可能会出现重复处理或任务遗漏的问题.
使用MQ统一分发调度任务,每个服务根据自身的业务需求,从MQ中消费任务并执行.
任务调度服务:生产者 。
// 定时任务生成器
@Scheduled(cron = "0 0 0 * * ?") // 每天凌晨触发
public void generateTasks() {
List<Task> expiredTasks = taskService.getExpiredTasks();
for (Task task : expiredTasks) {
rabbitTemplate.convertAndSend("task_exchange", "task_routing_key", task);
System.out.println("任务已发送:" + task.getId());
}
}
订单服务:消费者 。
@RabbitListener(queues = "order_task_queue")
public void processOrderTask(Task task) {
System.out.println("处理订单任务:" + task.getId());
// 执行订单清理逻辑
orderService.cleanExpiredOrder(task);
}
库存服务:消费者 。
@RabbitListener(queues = "stock_task_queue")
public void processStockTask(Task task) {
System.out.println("处理库存任务:" + task.getId());
// 执行库存释放逻辑
stockService.releaseStock(task);
}
分布式任务调度可以解决:
用户上传一个大文件后,需要对文件进行处理(如格式转换、压缩等)并存储.
如果同步执行这些任务,前端页面可能会一直加载,导致用户体验差.
用户上传文件后,立即将任务写入MQ,后台异步处理文件,处理完成后通知用户或更新状态.
上传服务:生产者 。
// 上传文件后,将任务写入MQ
FileTask fileTask = new FileTask();
fileTask.setFileId(fileId);
fileTask.setOperation("COMPRESS");
rabbitTemplate.convertAndSend("file_task_exchange", "file_task_key", fileTask);
System.out.println("文件处理任务已发送,文件ID:" + fileId);
文件处理服务:消费者 。
@RabbitListener(queues = "file_task_queue")
public void processFileTask(FileTask fileTask) {
System.out.println("处理文件任务:" + fileTask.getFileId() + " 操作:" + fileTask.getOperation());
// 模拟文件处理逻辑
if ("COMPRESS".equals(fileTask.getOperation())) {
fileService.compressFile(fileTask.getFileId());
} else if ("CONVERT".equals(fileTask.getOperation())) {
fileService.convertFileFormat(fileTask.getFileId());
}
// 更新任务状态
taskService.updateTaskStatus(fileTask.getFileId(), "COMPLETED");
}
前端轮询或回调通知 。
// 前端轮询文件处理状态
setInterval(() => {
fetch(`/file/status?fileId=${fileId}`)
.then(response => response.json())
.then(status => {
if (status === "COMPLETED") {
alert("文件处理完成!");
}
});
}, 5000);
异步文件处理的优势:
消息队列不只是传递消息的工具,更是系统解耦、提升稳定性和扩展性的利器.
在这10种经典场景中,每一种都能解决特定的业务痛点.
希望这篇文章对你理解MQ的应用场景有帮助! 。
如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下我的同名公众号:苏三说技术,您的支持是我坚持写作最大的动力.
求一键三连:点赞、转发、在看.
关注公众号:【苏三说技术】,在公众号中回复:进大厂,可以免费获取我最近整理的10万字的面试宝典,好多小伙伴靠这个宝典拿到了多家大厂的offer.
最后此篇关于工作中这样用MQ,很香!的文章就讲到这里了,如果你想了解更多关于工作中这样用MQ,很香!的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我在Windows 10中使用一些简单的Powershell代码遇到了这个奇怪的问题,我认为这可能是我做错了,但我不是Powershell的天才。 我有这个: $ix = [System.Net.Dn
var urlsearch = "http://192.168.10.113:8080/collective-intellegence/StoreClicks?userid=" + userId +
我有一个非常奇怪的问题,过去两天一直让我抓狂。 我有一个我试图控制的串行设备(LS 100 光度计)。使用设置了正确参数的终端(白蚁),我可以发送命令(“MES”),然后是定界符(CR LF),然后我
我目前正试图让无需注册的 COM 使用 Excel 作为客户端,使用 .NET dll 作为服务器。目前,我只是试图让概念验证工作,但遇到了麻烦。 显然,当我使用 Excel 时,我不能简单地使用与可
我开发了简单的 REST API - https://github.com/pavelpetrcz/MandaysFigu - 我的问题是在本地主机上,WildFly 16 服务器的应用程序运行正常。
我遇到了奇怪的情况 - 从 Django shell 创建一些 Mongoengine 对象是成功的,但是从 Django View 创建相同的对象看起来成功,但 MongoDB 中没有出现任何数据。
我是 flask 的新手,只编写了一个相当简单的网络应用程序——没有数据库,只是一个航类搜索 API 的前端。一切正常,但为了提高我的技能,我正在尝试使用应用程序工厂和蓝图重构我的代码。让它与 pus
我的谷歌分析 JavaScript 事件在开发者控制台中运行得很好。 但是当从外部 js 文件包含在页面上时,它们根本不起作用。由于某种原因。 例如; 下面的内容将在包含在控制台中时运行。但当包含在单
这是一本名为“Node.js 8 the Right Way”的书中的任务。你可以在下面看到它: 这是我的解决方案: 'use strict'; const zmq = require('zeromq
我正在阅读文本行,并创建其独特单词的列表(在将它们小写之后)。我可以使它与 flatMap 一起工作,但不能使它与 map 的“子”流一起工作。 flatMap 看起来更简洁和“更好”,但为什么 di
我正在编写一些 PowerShell 脚本来进行一些构建自动化。我发现 here echo $? 根据前面的语句返回真或假。我刚刚发现 echo 是 Write-Output 的别名。 写主机 $?
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 4年前关闭。 Improve thi
我将一个工作 View Controller 类从另一个项目复制到一个新项目中。我无法在新项目中加载 View 。在旧项目中我使用了presentModalViewController。在新版本中,我
我对 javascript 很陌生,所以很难看出我哪里出错了。由于某种原因,我的功能无法正常工作。任何帮助,将不胜感激。我尝试在外部 js 文件、头部/主体中使用它们,但似乎没有任何效果。错误要么出在
我正在尝试学习Flutter中的复选框。 问题是,当我想在Scaffold(body :)中使用复选框时,它正在工作。但我想在不同的地方使用它,例如ListView中的项目。 return Cente
我们当前使用的是 sleuth 2.2.3.RELEASE,我们看不到在 http header 中传递的 userId 字段没有传播。下面是我们的代码。 BaggageField REQUEST_I
我有一个组合框,其中包含一个项目,比如“a”。我想调用该组合框的 Action 监听器,仅在手动选择项目“a”完成时才调用。我也尝试过 ItemStateChanged,但它的工作原理与 Action
你能看一下照片吗?现在,一步前我执行了 this.interrupt()。您可以看到 this.isInterrupted() 为 false。我仔细观察——“这个”没有改变。它具有相同的 ID (1
我们当前使用的是 sleuth 2.2.3.RELEASE,我们看不到在 http header 中传递的 userId 字段没有传播。下面是我们的代码。 BaggageField REQUEST_I
我正在尝试在我的网站上设置一个联系表单,当有人点击发送时,就会运行一个作业,并在该作业中向所有管理员用户发送通知。不过,我在失败的工作表中不断收到此错误: Illuminate\Database\El
我是一名优秀的程序员,十分优秀!