gpt4 book ai didi

java - 从外部服务发送命令时,聚合中的 axon 3.4 CommandHandler 不会被触发

转载 作者:行者123 更新时间:2023-11-30 01:50:42 26 4
gpt4 key购买 nike

所以我试图了解 axon 3.4 中的分布式命令总线。我有一个用例,当发送某个命令时,聚合会发送一个启动传奇的事件,该传奇会发送 2 个命令,以保持发送到 2 个不同服务的数据处于一致状态。

现在棘手的部分来了,CommandHandlers 是在外部服务中定义的,这些服务执行某些操作,然后发送回命令以及其中的操作结果。但是,当命令发送时,我总是会遇到超时异常,因此 CommandBus 知道哪个聚合必须处理它,但无法将正确的聚合分配给命令。

目前 commandService.createCurrency 仅记录一条消息,这就是事件处理程序中存在 Thread.sleep 的原因,以模拟更长的运行过程。

您将在下面找到我的代码:

@Configuration
public class AxonConfig {

@Autowired
private Registration registration;

private RestTemplate restTemplate = new RestTemplate();

@Bean
public CommandBusConnector springHttpCommandBusConnector(@Qualifier("localSegment") CommandBus localSegment,
Serializer serializer) {
return new SpringHttpCommandBusConnector(localSegment, restTemplate, serializer);
}

@Bean
public CommandRouter springCloudCommandRouter(DiscoveryClient discoveryClient) {
return new SpringCloudCommandRouter(discoveryClient, registration, new AnnotationRoutingStrategy());
}

@Primary // to make sure this CommandBus implementation is used for autowiring
@Bean
public DistributedCommandBus springCloudDistributedCommandBus(CommandRouter commandRouter,
CommandBusConnector commandBusConnector) {
return new DistributedCommandBus(commandRouter, commandBusConnector);
}

}

服务1

聚合:

@Aggregate
@Data
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CreateCurrencyAggregate {

@AggregateIdentifier
private String id;

@CommandHandler
public CreateCurrencyAggregate(CreateCurrencyCommand command) {
log.info("starting create currency");
Assert.notNull(command.getId(), "CreateCurrencyCommand must have an id");
Assert.hasLength(command.getId(), "CreateCurrencyCommand id cannot be an empty String");
this.id = command.getId();
apply(CreateCurrencyEvent.builder()
.id(command.getId())
.payload(command.getPayload())
.build());
}

@CommandHandler
public void on(DalCreatedCommand command) {
log.info("Currency created on dal layer");
apply(DalCurrencyCreatedEvent.builder()
.dalId(command.getId())
.build());

}
}

传奇:

@Slf4j
@Saga
public class CreateCurrencySaga {

@Autowired
private transient CommandGateway commandGateway;

@StartSaga
@SagaEventHandler(associationProperty = "id")
public void handle(CreateCurrencyEvent event) {
log.info("starting saga...");
dalCreated = false;
as400Created = true;
SagaLifecycle.associateWith("id", event.getId());
SagaLifecycle.associateWith("dalId", event.getId());
commandGateway.send(CreateDalCurrencyCommand.builder()
.id(event.getId())
.payload(event.getPayload())
.build());
}

@SagaEventHandler(associationProperty = "dalId")
public void handle(DalCurrencyCreatedEvent event) {
log.info("receiving createdEvent");
SagaLifecycle.end();
}


}

服务2

外部命令处理程序

@Slf4j
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Component
public class CurrencyCommandHandler {

@Autowired
private EventBus eventBus;

@CommandHandler
public void on(CreateDalCurrencyCommand command) {
eventBus.publish(asEventMessage(CreateDalCurrencyEvent.builder()
.id(command.getId())
.payload(command.getPayload())
.build()));
}
}

事件处理程序

@Slf4j
@RequiredArgsConstructor
@Component
public class CurrencyEventHandlers {

private final CurrencyCommandService commandService;

private final CommandGateway commandGateway;

@EventHandler
public void handle(CreateDalCurrencyEvent event){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
commandService.createCurrency(event.getId(), event.getPayload());
var result = commandGateway.send(DalCreatedCommand.builder()
.id(event.getId())
.build());
}
}

最佳答案

我想我可以为您提供一些该领域的补充背景知识。

遗憾的是,用作发现服务的 Spring Cloud 的实现带来了天壤之别。在内部,SpringCloudCommandRouter 使用ServiceInstance 的元数据来共享MessageRoutingInformation。连接到您的设置的每个应用程序都将由 ServiceInstance 表示,因此共享您作为服务可以通过此方法处理的消息(因此也是命令)将很简单。

但是,当构建 SpringCloudCommandRouter 时,这是通过利用 Eureka 作为 Spring Cloud 实现进行测试的。 Eureka 允许调整 ServiceInstance 的元数据,因此我可以相当有信心地说,如果您使用 Spring Cloud Eureka,我希望事情能像它那样工作。

但是,如果您使用 Consul,那就是另一回事了。Spring Cloud Consul 不允许调整 ServiceInstance 的元数据。我创建了一个issue过去将 API 调整为实际上能够更新元数据。

无论如何,Axon Framework 已通过提供 SpringCloudHttpBackupCommandRouter 解决了为 Spring Cloud Consul 和其他不允许调整元数据的实现提供支持的问题。

因此,我建议将您的配置调整为使用 SpringCloudHttpBackupCommandRouter 而不是“SpringCloudCommandRouter”

关于java - 从外部服务发送命令时,聚合中的 axon 3.4 CommandHandler 不会被触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56165811/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com