- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 AxonFramework 实现 JGroups,我指的是 this关联。我对代码做了一些更改,并在没有 Docker 的情况下运行该项目。以下是我的代码 -
主类 -
public class ClusterRunner {
public static void main(String[] args) {
Thread t1 = new Thread(new PrimaryNode());
Thread t2 = new Thread(new SecondaryNode());
t1.start();
t2.start();
}
}
主节点 -
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.jgroups.commandhandling.JGroupsConnector;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.jgroups.JChannel;
public class PrimaryNode implements Runnable {
private JGroupsConnector connector;
private CommandGateway commandGateway;
private EventStore eventStore;
private CommandBus commandBus;
public PrimaryNode() {
eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());
try {
commandBus = configureDistributedCommandBus();
} catch (Exception e) {
e.printStackTrace();
}
Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);
new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus);
commandGateway = new DefaultCommandGateway(commandBus);
}
public void run() {
for (int a = 0; a < 5; a++) {
System.out.println("Primary Node Created item " + a + " id: " + System.currentTimeMillis());
commandGateway.sendAndWait(new CreateItem(Long.toString(a), Long.toString(System.currentTimeMillis())));
}
}
private CommandBus configureDistributedCommandBus() throws Exception {
CommandBus commandBus = new SimpleCommandBus();
JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml"));
connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(),
new AnnotationRoutingStrategy());
connector.updateMembership(100, AcceptAll.INSTANCE);
connector.connect();
connector.awaitJoined();
return new DistributedCommandBus(connector, connector);
}
}
第二个节点 -
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.jgroups.commandhandling.JGroupsConnector;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.jgroups.JChannel;
public class SecondaryNode implements Runnable {
private JGroupsConnector connector;
private EventStore eventStore;
public SecondaryNode() {
eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());
CommandBus commandBus = null;
try {
commandBus = configureDistributedCommandBus();
} catch (Exception e) {
e.printStackTrace();
}
Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);
new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus);
@SuppressWarnings("unused")
CommandGateway commandGateway = new DefaultCommandGateway(commandBus);
}
public void run() {
new SubscribingEventProcessor("processor", new SimpleEventHandlerInvoker((EventListener) event -> {
System.out.println("Secondary Node -- " + event.getPayload());
}), eventStore).start();
}
private CommandBus configureDistributedCommandBus() throws Exception {
CommandBus commandBus = new SimpleCommandBus();
JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp_test.xml"));
connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(),
new AnnotationRoutingStrategy());
connector.updateMembership(100, AcceptAll.INSTANCE);
connector.connect();
connector.awaitJoined();
return new DistributedCommandBus(connector, connector);
}
}
项目 -
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.eventhandling.EventHandler;
import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;
class CreateItem {
@TargetAggregateIdentifier
private final String itemId;
private final String name;
public CreateItem(String itemId, String naam) {
this.itemId = itemId;
this.name = naam;
}
public String getItemId() {
return itemId;
}
public String getName() {
return name;
}
}
class ItemCreated {
private final String itemId;
private final String name;
public ItemCreated(String itemId, String naam) {
this.itemId = itemId;
this.name = naam;
}
public String getItemId() {
return itemId;
}
public String getName() {
return name;
}
@Override
public String toString() {
return itemId + " " + name;
}
}
class Item {
@AggregateIdentifier
private String itemId;
private String name;
public Item() {
}
@CommandHandler
public Item(CreateItem createItem) {
apply(new ItemCreated(createItem.getItemId(), createItem.getName()));
}
@EventHandler
public void itemCreated(ItemCreated itemCreated) {
itemId = itemCreated.getItemId();
name = itemCreated.getName();
}
}
现在我的问题是,当我运行主类时,主节点产生 5 个事件,但辅助节点没有获取所有事件。它可能会获得 2、3 或 4 个事件,但不是全部。我希望所有事件都传递到辅助节点。我对 AxonFramework 和 JGroups 非常陌生。请帮助我理解这里的问题是什么。
最佳答案
所以在尝试了一切之后,我决定尝试一下路由策略。我决定使用 AbstractRoutingStrategy,它基本上有助于对没有决定性目的地的命令消息进行决策。以下是 JGroup 主节点(发送者)中的工作代码。将 PrimaryNode 类中的 configureDistributedCommandBus() 方法修改为 -
private CommandBus configureDistributedCommandBus() throws Exception {
CommandBus commandBus = new SimpleCommandBus();
channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml"));
RoutingStrategy rs = new AbstractRoutingStrategy(UnresolvedRoutingKeyPolicy.STATIC_KEY) {
@Override
protected String doResolveRoutingKey(CommandMessage<?> cmdMsg) {
View view = channel.getView();
if (view.getMembers().size() == 2) {
return "secondary";
} else if (view.getMembers().size() == 1) {
}
return cmdMsg.getIdentifier();
}
};
connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(), rs);
connector.updateMembership(100, AcceptAll.INSTANCE);
connector.connect();
connector.awaitJoined();
return new DistributedCommandBus(connector, connector);
}
由于我使用的是 JGroups,因此我可以获得集群的 View ,即有多少个节点。在此基础上我将决定命令消息路由。
关于java - 将所有事件从 CommandGateway 路由到单个事件处理程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43609665/
我的 Angular 应用程序中有以下代码。 app.config(function($routeProvider, $locationProvider) { $locationProvider
这就是我在 Backbone 中进行路由的方式,在决定调用哪个外部模板之前,首先获取路由及其参数。我觉得这很灵活。 var Router = Backbone.Router.extend({
我是 MEAN 堆栈领域的新手,我对 Angular 路线有一些疑问。为什么我应该在客户端重新创建后端已经用express.js创建的路由,有什么好处?这是 Angular.js 工作的唯一方式吗?我
我可以设置一条从根级 URL 进行映射的路由吗? http://localhost:49658/ 我使用的是 VS2010 内置 Web 服务器。 尝试使用空白或单斜杠 URL 字符串设置路由不起作用
我有一个现有的应用程序 Rails 3.2.17和 Angular js。我想在现有应用程序中包含 Activeadmin。 我遵循了 active-admin post from ryan bate
我正在关注 this Angular 中的路由教程,它就是行不通。当我使用“comp”选择器放置它的 HTML 代码时,它可以工作,但是当我尝试使用路由器 socket 对其进行路由时,它只显示来自
多个路由通过路由器进行管理。 前端路由的概念和原理 (编程中的) 路由 (router)就是一组 key-value 对应关系,分为:后端路由和前端路由 后端路由
服务器需要根据不同的URL或请求来执行不一样的操作,我们可以通过路由来实现这个步骤。 第一步我们需要先解析出请求URL的路径,我们引入url模块。 我们来给onRequest()函数加上一些逻辑
我正在为 Angular 6 应用程序设置路由,我想要一条可以匹配可变数量的段的路由。目前我有一个看起来像这样的路由配置: const routes: Routes = [ { path: '',
用户将点击电子邮件中的链接,如下所示: do-something/doSomething?thing=XXXXXXXXXXX 如何在路由器中定义路由并订阅获取参数? 目前在我的路由器中: {
我有一个具有以下结构的 Angular (4) 应用程序: app.module bi.module auth.module 路由应该是: / -> redirect to /home /
我正在使用 WCF 4 路由服务,并且需要以编程方式配置服务(而不是通过配置)。我见过的这样做的例子很少见,创建一个 MessageFilterTable 如下: var fi
我需要创建一个“路由”服务。我正在尝试使用 .Net 的 System.ServiceModel.Routing.IRequestReplyRouter我可以让它只在 HTTP 模式下工作,而不是在
例如,链接: /shop/phones/brend/apple/display/retina/color/red 在哪里: phones - category alias brend -
非常基本的问题,我很惊讶我找不到答案。我刚刚开始研究 django 并进行了开箱即用的安装。创建了一个项目并创建了一个应用程序。 urls.py 的默认内容很简单: urlpatterns = [
我已经实现了 WCF 路由服务;我还希望该服务(或类似的 WCF 服务)以规定的和统一的(与内容无关的)方式转换有效负载。例如,有效负载将始终采用 Foo 的形式。我想把它作为Bar在所有情况下。我很
我想使用 $locationProvider.html5Mode(true); 在 angularJs 中删除 # 哈希;但这导致所有 URL 都通过 angularJs 进行路由。我如何设置它以便只
我要听导航开始事件并判断其是否url属性是 /logout . 如果是这样,路由器应该停止触发连续事件,例如 路线已识别 , GuardsCheckStart , ChildActivationSta
有人可以解释我如何使用参数路由到 URL 吗? 例如id 喜欢点击产品并通过Id打开产品的更多信息。 我的路由到目前为止... angular.module('shop', ["cus
我目前正在 Angular: 7.2.14 上构建,想看看是否有人可以解释如何使用路由保护、共享服务或其他方式等重定向查询参数。 我试图解决的问题要求查询参数从根 Uri 路径传入,然后将路由重定向到
我是一名优秀的程序员,十分优秀!