gpt4 book ai didi

java - 将所有事件从 CommandGateway 路由到单个事件处理程序

转载 作者:行者123 更新时间:2023-12-02 13:19:57 24 4
gpt4 key购买 nike

我正在使用 AxonFramework 实现 JGroups,我指的是 this关联。我对代码做了一些更改,并在没有 Docker 的情况下运行该项目。以下是我的代码 -

主类 -

public class ClusterRunner {

public static void main(String[] args) {

Thread t1 = new Thread(new PrimaryNode());
Thread t2 = new Thread(new SecondaryNode());

t1.start();
t2.start();
}
}

主节点 -

import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.jgroups.commandhandling.JGroupsConnector;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.jgroups.JChannel;

public class PrimaryNode implements Runnable {

private JGroupsConnector connector;

private CommandGateway commandGateway;

private EventStore eventStore;

private CommandBus commandBus;

public PrimaryNode() {

eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());

try {

commandBus = configureDistributedCommandBus();

} catch (Exception e) {

e.printStackTrace();
}

Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);

new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus);

commandGateway = new DefaultCommandGateway(commandBus);
}

public void run() {

for (int a = 0; a < 5; a++) {

System.out.println("Primary Node Created item " + a + " id: " + System.currentTimeMillis());
commandGateway.sendAndWait(new CreateItem(Long.toString(a), Long.toString(System.currentTimeMillis())));
}
}

private CommandBus configureDistributedCommandBus() throws Exception {

CommandBus commandBus = new SimpleCommandBus();

JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml"));

connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(),
new AnnotationRoutingStrategy());
connector.updateMembership(100, AcceptAll.INSTANCE);

connector.connect();
connector.awaitJoined();

return new DistributedCommandBus(connector, connector);
}
}

第二个节点 -

import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.jgroups.commandhandling.JGroupsConnector;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.jgroups.JChannel;

public class SecondaryNode implements Runnable {

private JGroupsConnector connector;

private EventStore eventStore;

public SecondaryNode() {

eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());

CommandBus commandBus = null;

try {
commandBus = configureDistributedCommandBus();
} catch (Exception e) {
e.printStackTrace();
}

Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);

new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus);

@SuppressWarnings("unused")
CommandGateway commandGateway = new DefaultCommandGateway(commandBus);
}

public void run() {

new SubscribingEventProcessor("processor", new SimpleEventHandlerInvoker((EventListener) event -> {

System.out.println("Secondary Node -- " + event.getPayload());
}), eventStore).start();
}

private CommandBus configureDistributedCommandBus() throws Exception {

CommandBus commandBus = new SimpleCommandBus();

JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp_test.xml"));

connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(),
new AnnotationRoutingStrategy());
connector.updateMembership(100, AcceptAll.INSTANCE);

connector.connect();
connector.awaitJoined();

return new DistributedCommandBus(connector, connector);
}
}

项目 -

import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.eventhandling.EventHandler;

import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;

class CreateItem {

@TargetAggregateIdentifier
private final String itemId;
private final String name;

public CreateItem(String itemId, String naam) {
this.itemId = itemId;
this.name = naam;
}

public String getItemId() {
return itemId;
}

public String getName() {
return name;
}
}

class ItemCreated {
private final String itemId;
private final String name;

public ItemCreated(String itemId, String naam) {
this.itemId = itemId;
this.name = naam;
}

public String getItemId() {
return itemId;
}

public String getName() {
return name;
}

@Override
public String toString() {

return itemId + " " + name;
}
}

class Item {
@AggregateIdentifier
private String itemId;
private String name;

public Item() {

}

@CommandHandler
public Item(CreateItem createItem) {
apply(new ItemCreated(createItem.getItemId(), createItem.getName()));
}

@EventHandler
public void itemCreated(ItemCreated itemCreated) {
itemId = itemCreated.getItemId();
name = itemCreated.getName();
}
}

现在我的问题是,当我运行主类时,主节点产生 5 个事件,但辅助节点没有获取所有事件。它可能会获得 2、3 或 4 个事件,但不是全部。我希望所有事件都传递到辅助节点。我对 AxonFramework 和 JGroups 非常陌生。请帮助我理解这里的问题是什么。

最佳答案

所以在尝试了一切之后,我决定尝试一下路由策略。我决定使用 AbstractRoutingStrategy,它基本上有助于对没有决定性目的地的命令消息进行决策。以下是 JGroup 主节点(发送者)中的工作代码。将 PrimaryNode 类中的 configureDistributedCommandBus() 方法修改为 -

private CommandBus configureDistributedCommandBus() throws Exception {

CommandBus commandBus = new SimpleCommandBus();

channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml"));

RoutingStrategy rs = new AbstractRoutingStrategy(UnresolvedRoutingKeyPolicy.STATIC_KEY) {

@Override
protected String doResolveRoutingKey(CommandMessage<?> cmdMsg) {

View view = channel.getView();

if (view.getMembers().size() == 2) {

return "secondary";

} else if (view.getMembers().size() == 1) {

}

return cmdMsg.getIdentifier();
}
};

connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(), rs);
connector.updateMembership(100, AcceptAll.INSTANCE);

connector.connect();
connector.awaitJoined();

return new DistributedCommandBus(connector, connector);
}

由于我使用的是 JGroups,因此我可以获得集群的 View ,即有多少个节点。在此基础上我将决定命令消息路由。

关于java - 将所有事件从 CommandGateway 路由到单个事件处理程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43609665/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com