- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
订阅发布模式(Publish-Subscribe Pattern)是一种行之有效的解耦框架与业务逻辑的方式,也是一种常见的观察者设计模式,它被广泛应用于事件驱动架构中.
在这个模式中,发布者(或者说是主题)并不直接发送消息给订阅者,而是通过调度中心(或者叫消息代理)来传递消息。 发布者(或者说是主题)并不知道订阅者的存在,而订阅者也不知道发布者的存在。他们彼此唯一的关系就是在调度中心注册成为订阅者或者发布者.
当一个发布者有新消息时,就将这个消息发布到调度中心。调度中心就会将这个消息通知给所有订阅者。这就实现了发布者和订阅者之间的解耦,发布者和订阅者不再直接依赖于彼此,他们可以独立地扩展自己.
在具体的实现中,可以通过消息队列、事件总线等机制来实现调度中心,不同语言和平台都有实现的库和框架,例如 Java 中的 ActiveMQ、RabbitMQ、Kafka等.
订阅发布模式有以下优点:
interface Subscriber {
void update(String message);
}
class Publisher {
private Map<String, List<Subscriber>> subscribers = new HashMap<>();
public void subscribe(String topic, Subscriber subscriber) {
List<Subscriber> subscriberList = subscribers.get(topic);
if (subscriberList == null) {
subscriberList = new ArrayList<>();
subscribers.put(topic, subscriberList);
}
subscriberList.add(subscriber);
}
public void unsubscribe(String topic, Subscriber subscriber) {
List<Subscriber> subscriberList = subscribers.get(topic);
if (subscriberList != null) {
subscriberList.remove(subscriber);
}
}
public void publish(String topic, String message) {
List<Subscriber> subscriberList = subscribers.get(topic);
if (subscriberList != null) {
for (Subscriber subscriber : subscriberList) {
subscriber.update(message);
}
}
}
}
class EmailSubscriber implements Subscriber {
private String email;
public EmailSubscriber(String email) {
this.email = email;
}
public void update(String message) {
System.out.println("Send email to " + email + ": " + message);
}
}
class SMSSubscriber implements Subscriber {
private String phoneNumber;
public SMSSubscriber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public void update(String message) {
System.out.println("Send SMS to " + phoneNumber + ": " + message);
}
}
public class Main {
public static void main(String[] args) {
Publisher publisher = new Publisher();
Subscriber emailSubscriber1 = new EmailSubscriber("foo@example.com");
Subscriber smsSubscriber1 = new SMSSubscriber("1234567890");
publisher.subscribe("news", emailSubscriber1);
publisher.subscribe("news", smsSubscriber1);
publisher.publish("news", "发布新消息1");
publisher.unsubscribe("news", smsSubscriber1);
publisher.publish("news", "发布新消息2");
}
}
打印输出如下:
Send email to foo@example.com: 发布新消息1
Send SMS to 1234567890: 发布新消息1
Send email to foo@example.com: 发布新消息2
Spring的订阅发布模式是通过发布事件、事件监听器和事件发布器3个部分来完成的 。
这里我们通过 newbee-mall-pro 项目中已经实现订阅发布模式的下单流程给大家讲解,项目地址: https://github.com/wayn111/newbee-mall-pro 。
public class OrderEvent extends ApplicationEvent {
void onApplicationEvent(Object event) {
...
}
}
@Component
public class OrderListener implements ApplicationListener<OrderEvent> {
@Override
public void onApplicationEvent(OrderEvent event) {
// 生成订单、删除购物车、扣减库存
...
}
}
@Resource
private ApplicationEventPublisher applicationEventPublisher;
private void saveOrder(MallUserVO mallUserVO, Long couponUserId, List<ShopCatVO> shopcatVOList, String orderNo) {
// 订单检查
...
// 生成订单号
String orderNo = NumberUtil.genOrderNo();
// 发布订单事件,在事件监听中处理下单逻辑
applicationEventPublisher.publishEvent(new OrderEvent(orderNo, mallUserVO, couponUserId, shopcatVOList));
// 所有操作成功后,将订单号返回
return orderNo;
...
}
通过事件监听机制,我们将下单逻辑拆分成如下步骤:
- 订单检查
- 生成订单号
- 发布订单事件,在事件监听中处理订单保存逻辑
- 所有操作成功后,将订单号返回
每个步骤都是各自独立不互相影响
如上的代码已经实现了订阅发布模式,成功解耦了下单逻辑。但是在性能上还没有得到优化,因为 Spring Boot 项目中,默认情况下事件监听器是同步处理的,也就是说这里下单流程会等待事件监听器处理完毕才返回,最终影响接口响应时长.
Spring Boot 项目中事件监听发布类是由 SimpleApplicationEventMulticaster 这个类实现的,源码中通知订阅者代码如下: 可以看到,代码里是有判断 getTaskExecutor() 方法返回不为空的话,就交由 executor 执行,负责同步执行。这个时候大家就要问了,这里不是有线程池在异步通知订阅者吗?
不急,博主带大家继续查看源码。 可以看到 getTaskExecutor() 方法返回一个成员属性,这个成员属性在 SimpleApplicationEventMulticaster 类中是通过 setTaskExecutor(@Nullable Executor taskExecutor) 方法设置的。我们通过 ctrl + f7 查一下 setTaskExecutor(...) 方法在哪里被调用过, Ok,到此水落石出, SimpleApplicationEventMulticaster 类的 taskExecutor 成员属性一直为 null,所以在通过订阅者的时候一直是同步处理,等待订阅者处理完毕.
对于异步处理,我们可以从2个方面入手:
onApplicationEvent
上加上 @Async
注解,表示该方法异步执行。 taskExecutor
属性,通过源码可知,也可以解决。 这里博主给大家介绍下怎么修改事件监听发布类来解决.
/**
* 系统启动时執行
*/
@Component
public class SpringBeanStartupRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
// 设置spring默认的事件监听为异步执行
SimpleApplicationEventMulticaster multicaster = SpringContextUtil.getBean(SimpleApplicationEventMulticaster.class);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
60L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(500),
new CustomizableThreadFactory("newbee—event-task"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
multicaster.setTaskExecutor(threadPoolExecutor);
}
}
在系统启动时反射修改 SimpleApplicationEventMulticaster 类的 taskExecutor 属性,从而让 SimpleApplicationEventMulticaster 类支持异步事件通知.
建议大家在日常开发中多加思考哪些业务流程可以适用,例如微服务项目中订单支付成功后需要通知用户、商品、活动等多个服务时,可以考虑使用订阅发布模式。解耦发布者和订阅者,发布者只管发布消息,不需要知道有哪些订阅者,也不需要知道订阅者的具体实现。订阅者只需要关注自己感兴趣的消息即可。这种松耦合的设计使得系统更容易扩展和维护.
关注公众号【waynblog】每周分享技术干货、开源项目、实战经验、高效开发工具等,您的关注将是我的更新动力! 。
最后此篇关于设计模式之订阅发布模式的文章就讲到这里了,如果你想了解更多关于设计模式之订阅发布模式的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在用 Java 创建一组小部件,用于解码和显示在串行接口(interface)接收到的消息。 消息类型由唯一标识符定义。每个小部件只对特定标识符感兴趣。 如何对应用程序进行编程,以便将消息正确分发
我有以下代码,其中包含多个订阅。我需要实现的是这样的: 订阅activateRoute 以获取用户和产品数据。 返回商品数据后,使用商品数据订阅getSeller服务。 使用返回的卖家数据订阅 get
我已经使用 Fitbit 的 PHP 库 (www.disciplinexgames.com/fitbit) 在我的网站中成功集成了 FitBit api。它工作正常,但我现在想使用订阅 API,以便
在我的 Angular 7 应用程序中,我有下一个功能: getUserData(uid) { return this.fireStore.collection('users').doc(
我正在尝试在 Node 中实现发布/订阅模式,但不使用 Redis。功能应该是相同的;您可以发布到 channel ,订阅 channel 并收听数据(如果您已订阅);以下是 Redis 功能: pu
这是我当前的应用程序结构: /client/client.js /server/server.js collection.js 有 HTML 和 CSS 文件,但这些与我的问题无关。在将我的应用程序拆
我们正在使用OpenTok建立视频聊天室体验,并且在基本工作正常的同时,我发现当 session 室中有很多参与者发布音频时,本底噪声非常高。像Zoom这样的浏览器外解决方案似乎没有这种高水平的“白噪
RabbitMQ 是点对点还是发布-订阅?或者两者都取决于配置选项? 我一直在查看配置,它们似乎都支持点对点模型而不是发布-订阅。即消息一旦被消费就会从队列中删除,并且不可用于第二个消费者。 最佳答案
我是 Angular 6 和 ngrx 商店的新人。我尝试在从商店订阅数据后调度操作,但它会导致无限循环并使浏览器崩溃?我错了什么。我发现它使用 rxjs 的 do/tap 运算符但仍然不起作用的一些
这个问题已经有答案了: Property '...' has no initializer and is not definitely assigned in the constructor (37
这个问题已经有答案了: Property '...' has no initializer and is not definitely assigned in the constructor (37
我正在使用 Visual Studio 2017 v15.6.2 和 Azure Services Authentication Extension 为支持 MSI 的应用程序进行本地 azure 功
我想知道如何确定给定的 WC_Product 对象 $product 是否是订阅产品。 最佳答案 您可以使用他们的辅助函数,这可能是最完整的: if( class_exists( 'WC_Subscr
我正在研究使用服务器发送的事件作为支持 api 来实现“订阅”类型。 我正在苦苦挣扎的是接口(interface),更准确地说,是这种操作的 http 层。 问题: 使用原生 EventSource不
我会根据每个用户的订阅类型向我的用户发送通知。 例如: 用户 A 订阅了所有新闻文章 用户 B 订阅了所有评论 用户 C 订阅了网站上的所有新内容 我有一个每 5 分钟运行一次的脚本(除非该脚本仍在运
我正在使用 Ionic2/Angular2,并且需要使用参数 authData 调用函数,如下所示。 public auth: FirebaseAuth this.auth.subscrib
已结束。此问题正在寻求书籍、工具、软件库等的推荐。它不满足Stack Overflow guidelines 。目前不接受答案。 我们不允许提出寻求书籍、工具、软件库等推荐的问题。您可以编辑问题,以便
我们现有的系统可以持续处理大量文件。粗略地说,每天大约有 300 万个文件,大小从几千字节到超过 50 MB。这些文件从接收到完成使用会经历几个不同的处理阶段,具体取决于它们所采用的路径。由于这些文件
我有一项服务,我使用 Paypal 订阅。 Paypal 有 webhooks。问题是我不知道我需要使用哪个,不知道用户是否为下个月付款。 我使用了 Billing subscription rene
我目前正在为一个网站整理一个处理脚本,遇到了一个我似乎无法找到明确答案的问题。 Paypal 的文档充其量是不确定的,我对 Paypal 的使用还不够多,无法从他们提供的信息中轻松辨别答案。 当通过
我是一名优秀的程序员,十分优秀!