- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
消息中间件提供了系统之间的异步处理机制,比如在电商网站上支付订单之后,会触发库存计算,物流调度计算,甚至是营销人员绩效计算,报表统计等,诸如此类的操作一般会耗费比订单购买商品本身更多的时间,加之这样的操作没有即时的时效性要求,用户在下单之后完全没有必要等待电商后端做完所有操作才算成功,那么此时消息中间件是一种非常好的解决方案,用户下单支付之后即可向用户返回购买成功的通知,然后提交各种消息到消息中间件,这样注册在消息中间件的其他系统就可以顺利地接受订单通知了,然后执行各自的业务逻辑。消息中间件主要用于解决进程之间消息异步处理的解决方案。
Bus 接口:对外提供几种主要的使用方式,比如 post 方法用来发送 Event,register 方法用来注册 Evnet 接收者(Subscriber)接受相应事件,EventBus 采用同步的方式推送 Event,AsyncEventBus 采用异步的方式(Thread-Per-Message)推送 Event。
Register 注册表:主要用来记录对应 Subscriber 以及受理消息的回调方法,回调方法用注解 @Subscribe 来标识。
Dispatcher:主要用来将 event 广播给注册表中监听了 topic 的 Subscriber。
package concurrent.eventbus;
/**
* @className: Bus
* @description: 定义了 EventBus 的所有使用方法
* @date: 2022/5/13
* @author: cakin
*/
public interface Bus {
// 将某个对象注册到 Bus 上,从此之后该类就成为了 Subscriber 了
void register(Object subscriber);
// 将某个对象从 Bus 上取消注册,取消注册之后就不会再接受到来自 Bus 的任何消息
void unregister(Object subscriber);
// 提交 Event 到默认的 topic
void post(Object event);
// 提交 Event 到指定的 topic
void post(Object Event, String topic);
// 关闭该 bus
void close();
// 返回 Bus 的名称标识
String getBusName();
}
package concurrent.eventbus;
import java.util.concurrent.Executor;
/**
* @className: EventBus
* @description: 它实现了 Bus 所有的功能,采用的是同步的方式
* @date: 2022/5/13
* @author: cakin
*/
public class EventBus implements Bus {
// 用于维护 Subscriber 的注册表
private final Registry registry = new Registry();
// Event Bus 的名字
private String busName;
// 默认的 Event Bus 的名字
private final static String DEFAULT_BUS_NAME = "default";
// 默认的 Event Bus 的名字
private final static String DEFAULT_TOPIC = "default-topic";
// 用于分发广播消息到各个 Subscriber 的类
private final Dispatcher dispatcher;
public EventBus() {
this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
}
public EventBus(String busName) {
this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
}
public EventBus(String busName, EventExceptionHandler eventExceptionHandler, Executor executor) {
this.busName = busName;
this.dispatcher = Dispatcher.newDispatcher(eventExceptionHandler, executor);
}
public EventBus(EventExceptionHandler eventExceptionHandler) {
this(DEFAULT_BUS_NAME, eventExceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE);
}
@Override
public void register(Object subscriber) {
this.registry.bind(subscriber);
}
@Override
public void unregister(Object subscriber) {
this.registry.unbind(subscriber);
}
@Override
public void post(Object event) {
this.post(event, DEFAULT_TOPIC);
}
@Override
public void post(Object event, String topic) {
this.dispatcher.dispatch(this, registry, event, topic);
}
@Override
public void close() {
this.dispatcher.close();
}
@Override
public String getBusName() {
return null;
}
}
package concurrent.eventbus;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @className: AsyncEventBus
* @description: 异步 EventBus
* @date: 2022/5/13
* @author: cakin
*/
public class AsyncEventBus extends EventBus {
AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
super(busName, exceptionHandler, executor);
}
AsyncEventBus(String busName, ThreadPoolExecutor executor) {
this(busName, null, executor);
}
AsyncEventBus(ThreadPoolExecutor executor) {
this("default_async", null, executor);
}
AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
this("default_async", exceptionHandler, executor);
}
}
package concurrent.eventbus;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @className: Registry
* @description: 注册表维护了 topic 和 subscriber 之间的关系,当有 Event 被 post 之后,Dispatcher 需要知道该消息应该发送哪个 Subscriber 的实例和对应的方法
* @date: 2022/5/13
* @author: cakin
*/
public class Registry {
// 存储 Subscriber 集合和 topic 之间关系的 map
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer = new ConcurrentHashMap<>();
public void bind(Object subscriber) {
// 获取 Subscriber Object 的方法集合,然后进行绑定
List<Method> subscribeMethods = getSubscribeMethods(subscriber);
subscribeMethods.forEach(m -> tierSubscriber(subscriber, m));
}
public void unbind(Object subscriber) {
// unbind 为了提高速度,只对 Subscriber 进行失效操作
subscriberContainer.forEach((key, queue) ->
queue.forEach(s -> {
if (s.getSubscribeObject() == subscriber) {
s.setDisable(true);
}
})
);
}
private void tierSubscriber(Object subscriber, Method method) {
final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class);
String topic = subscribe.topic();
// 当某个 topic 没有 Subscriber Queue 的时候创建一个
subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>());
// 创建一个 subscriber 并且加入 subscriber 列表中
subscriberContainer.get(topic).add(new Subscriber(subscriber, method));
}
public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic) {
return subscriberContainer.get(topic);
}
private List<Method> getSubscribeMethods(Object subcriber) {
final List<Method> methods = new ArrayList<>();
Class<?> temp = subcriber.getClass();
// 不断获取所有的方法
while (temp != null) {
// 获取所有的方法
Method[] declaredMethods = temp.getDeclaredMethods();
// 只有 public 方法 && 有一个入参 && 被 @Subscribe 标记的方法才符合回调方法
Arrays.stream(declaredMethods)
.filter(m -> m.isAnnotationPresent(Subscribe.class)
&& m.getParameterCount() == 1
&& m.getModifiers() == Modifier.PUBLIC)
.forEach(methods::add);
temp = temp.getSuperclass();
}
return methods;
}
}
package concurrent.eventbus;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
/**
* @className: Dispatcher
* @description: Event 广播 Dispatch
* @date: 2022/5/13
* @author: cakin
*/
public class Dispatcher {
private final Executor executorService;
private final EventExceptionHandler exceptionHandler;
public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE;
public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE;
public Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler) {
this.executorService = executorService;
this.exceptionHandler = exceptionHandler;
}
public void dispatch(Bus bus, Registry registry, Object event, String topic) {
// 根据 topic 获取所有的 Subscriber 列表
ConcurrentLinkedQueue<Subscriber> subscribers = registry.scanSubscriber(topic);
if (null == subscribers) {
if (exceptionHandler != null) {
exceptionHandler.handler(new IllegalArgumentException("The topic" + topic + " note bind yet"), new BaseEventContext(bus.getBusName(), null, event));
return;
}
}
// 遍历所有的方法,并且通过反射的方式进行方法调用
subscribers.stream()
.filter(subscriber -> !subscriber.isDisable())
.filter(subscriber -> {
Method subcribeMethod = subscriber.getSubscribeMethod();
Class<?> aClass = subcribeMethod.getParameterTypes()[0];
return aClass.isAssignableFrom(event.getClass());
}).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus));
}
private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus) {
Method subscribeMethod = subscriber.getSubscribeMethod();
Object subscribeObject = subscriber.getSubscribeObject();
executorService.execute(() -> {
try {
subscribeMethod.invoke(subscribeObject, event);
} catch (Exception e) {
if (null != exceptionHandler) {
exceptionHandler.handler(e, new BaseEventContext(bus.getBusName(), subscriber, event));
}
}
});
}
public void close() {
if (executorService instanceof ExecutorService) {
((ExecutorService) executorService).shutdown();
}
}
static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor) {
return new Dispatcher(executor, exceptionHandler);
}
static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) {
return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler);
}
static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler) {
return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler);
}
// 顺序执行的 ExecutorService
private static class SeqExecutorService implements Executor {
private final static SeqExecutorService INSTANCE = new SeqExecutorService();
@Override
public void execute(Runnable command) {
command.run();
}
}
// 每个线程负责一次消息推送
private static class PreThreadExecutorService implements Executor {
private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService();
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}
// 默认 EventContext 实现
private static class BaseEventContext implements EventContext {
private final String eventBusName;
private final Subscriber subscriber;
private final Object event;
private BaseEventContext(String eventBusName, Subscriber subscriber, Object event) {
this.eventBusName = eventBusName;
this.subscriber = subscriber;
this.event = event;
}
@Override
public String getSource() {
return this.eventBusName;
}
@Override
public Object getSubscriber() {
return subscriber != null ? subscriber.getSubscribeObject() : null;
}
@Override
public Method getSubscribe() {
return subscriber != null ? subscriber.getSubscribeMethod() : null;
}
@Override
public Object getEvent() {
return this.event;
}
}
}
package concurrent.eventbus;
import java.lang.reflect.Method;
/**
* @className: Subscriber
* @description: 封装了对象实例和被 @Subscribe 标记的方法
* @date: 2022/5/13
* @author: cakin
*/
public class Subscriber {
private final Object subscribeObject;
private final Method subscribeMethod;
private boolean disable = false;
public Subscriber(Object subscribeObject, Method subscribeMethod) {
this.subscribeObject = subscribeObject;
this.subscribeMethod = subscribeMethod;
}
public Object getSubscribeObject() {
return subscribeObject;
}
public Method getSubscribeMethod() {
return subscribeMethod;
}
public boolean isDisable() {
return disable;
}
public void setDisable(boolean disable) {
this.disable = disable;
}
}
package concurrent.eventbus;
/**
* @className: EventExceptionHandler
* @description: 异常处理
* @date: 2022/5/13
* @author: cakin
*/
public interface EventExceptionHandler {
void handler(Throwable cause, EventContext context);
}
package concurrent.eventbus;
import java.lang.reflect.Method;
/**
* @className: EventContext
* @description: 事件上下文
* @date: 2022/5/13
* @author: cakin
*/
public interface EventContext {
String getSource();
Object getSubscriber();
Method getSubscribe();
Object getEvent();
}
package concurrent.eventbus;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @className: Subscribe
* @description: 注解在类的方法上,注解时可指定 topic,不指定的情况下未默认的 topic(default-topic)
* @date: 2022/5/13
* @author: cakin
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
String topic() default "default-topic";
}
package concurrent.eventbus;
/**
* @className: SimpleSubsciber1
* @description: TODO
* @date: 2022/5/13
* @author: cakin
*/
public class SimpleSubscriber1 {
@Subscribe
public void method1(String message){
System.out.println("==SimpleSubscriber1==method1=="+message);
}
@Subscribe(topic = "test")
public void method2(String message){
System.out.println("==SimpleSubscriber1==method2=="+message);
}
}
package concurrent.eventbus;
/**
* @className: SimpleSubsciber2
* @description: SimpleSubsciber2
* @date: 2022/5/13
* @author: 贝医
*/
public class SimpleSubscriber2 {
@Subscribe
public void method1(String message){
System.out.println("==SimpleSubscriber2==method1=="+message);
}
@Subscribe(topic = "test")
public void method2(String message){
System.out.println("==SimpleSubscriber2==method2=="+message);
}
}
package concurrent.eventbus;
public class SyncTest {
public static void main(String[] args) {
Bus bus = new EventBus("Test");
bus.register(new SimpleSubscriber1());
bus.register(new SimpleSubscriber2());
bus.post("Hello");
System.out.println("---------");
bus.post("Hello", "test");
}
}
package concurrent.eventbus;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class ASyncTest {
public static void main(String[] args) {
Bus bus = new AsyncEventBus("Test", (ThreadPoolExecutor) Executors.newFixedThreadPool(10));
bus.register(new SimpleSubscriber1());
bus.register(new SimpleSubscriber2());
bus.post("Hello");
System.out.println("---------");
bus.post("Hello", "test");
}
}
==SimpleSubscriber1==method1==Hello
==SimpleSubscriber2==method1==Hello
==SimpleSubscriber1==method2==Hello
==SimpleSubscriber2==method2==Hello
==SimpleSubscriber2==method1==Hello
==SimpleSubscriber1==method1==Hello
==SimpleSubscriber1==method2==Hello
==SimpleSubscriber2==method2==Hello
对此感到疯狂,真的缺少一些东西。 我有webpack 4.6.0,webpack-cli ^ 2.1.2,所以是最新的。 在文档(https://webpack.js.org/concepts/mod
object Host "os.google.com" { import "windows" address = "linux.google.com" groups = ["linux"] } obj
每当我安装我的应用程序时,我都可以将数据库从 Assets 文件夹复制到 /data/data/packagename/databases/ .到此为止,应用程序工作得很好。 但 10 或 15 秒后
我在 cc 模式缓冲区中使用 hideshow.el 来折叠我不查看的文件部分。 如果能够在 XML 文档中做到这一点就好了。我使用 emacs 22.2.1 和内置的 sgml-mode 进行 xm
已结束。此问题不符合 Stack Overflow guidelines .它目前不接受答案。 我们不允许提出有关书籍、工具、软件库等方面的建议的问题。您可以编辑问题,以便用事实和引用来回答它。 关闭
根据java: public Scanner useDelimiter(String pattern) Sets this scanner's delimiting pattern to a patt
我读过一些关于 PRG 模式以及它如何防止用户重新提交表单的文章。比如this post有一张不错的图: 我能理解为什么在收到 2xx 后用户刷新页面时不会发生表单提交。但我仍然想知道: (1) 如果
看看下面的图片,您可能会清楚地看到这一点。 那么如何在带有其他一些 View 的简单屏幕中实现没有任何弹出/对话框/模式的微调器日期选择器? 我在整个网络上进行了谷歌搜索,但没有找到与之相关的任何合适
我不知道该怎么做,我一直遇到问题。 以下是代码: rows = int(input()) for i in range(1,rows): for j in range(1,i+1):
我想为重写创建一个正则表达式。 将所有请求重写为 index.php(不需要匹配),它不是以/api 开头,或者不是以('.html',或'.js'或'.css'或'.png'结束) 我的例子还是这样
MVC模式代表 Model-View-Controller(模型-视图-控制器) 模式 MVC模式用于应用程序的分层开发 Model(模型) - 模型代表一个存取数据的对象或 JAVA PO
我想为组织模式创建一个 RDF 模式世界。您可能知道,组织模式文档基于层次结构大纲,其中标题是主要的分组实体。 * March auxiliary :PROPERTIES: :HLEVEL: 1 :E
我正在编写一个可以从文件中读取 JSON 数据的软件。该文件包含“person”——一个值为对象数组的对象。我打算使用 JSON 模式验证库来验证内容,而不是自己编写代码。符合代表以下数据的 JSON
假设我有 4 张 table 人 公司 团体 和 账单 现在bills/persons和bills/companys和bills/groups之间是多对多的关系。 我看到了 4 种可能的 sql 模式
假设您有这样的文档: doc1: id:1 text: ... references: Journal1, 2013, pag 123 references: Journal2, 2014,
我有这个架构。它检查评论,目前工作正常。 var schema = { id: '', type: 'object', additionalProperties: false, pro
这可能很简单,但有人可以解释为什么以下模式匹配不明智吗?它说其他规则,例如1, 0, _ 永远不会匹配。 let matchTest(n : int) = let ran = new Rand
我有以下选择序列作为 XML 模式的一部分。理想情况下,我想要一个序列: 来自 my:namespace 的元素必须严格解析。 来自任何其他命名空间的元素,不包括 ##targetNamespace和
我希望编写一个 json 模式来涵盖这个(简化的)示例 { "errorMessage": "", "nbRunningQueries": 0, "isError": Fals
首先,我是 f# 的新手,所以也许答案很明显,但我没有看到。所以我有一些带有 id 和值的元组。我知道我正在寻找的 id,我想从我传入的三个元组中选择正确的元组。我打算用两个 match 语句来做到这
我是一名优秀的程序员,十分优秀!