- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试通过 rabbitmq 将消息发送到基于 axon4 spring boot 的系统。收到消息但未触发任何事件。我很确定我遗漏了一个重要的部分,但到目前为止我无法弄清楚。
这里是我的 application.yml 的相关部分
axon:
amqp:
exchange: axon.fanout
transaction-mode: publisher_ack
# adding the following lines changed nothing
eventhandling:
processors:
amqpEvents:
source: in.queue
mode: subscribing
spring:
rabbitmq:
username: rabbit
password: rabbit
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class AxonConfig {
@Bean
SpringAMQPMessageSource inputMessageSource(final AMQPMessageConverter messageConverter) {
return new SpringAMQPMessageSource(messageConverter) {
@RabbitListener(queues = "in.queue")
@Override
public void onMessage(final Message message, final Channel channel) {
log.debug("received external message: {}, channel: {}", message, channel);
super.onMessage(message, channel);
}
};
}
}
AxonConfig : received external message: (Body:'[B@13f7aeef(byte[167])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=in.queue, deliveryTag=2, consumerTag=amq.ctag-xi34jwHHA__xjENSteX5Dw, consumerQueue=in.queue]), channel: Cached Rabbit Channel: AMQChannel(amqp://rabbit@127.0.0.1:5672/,1), conn: Proxy@11703cc8 Shared Rabbit Connection: SimpleConnection@581cb879 [delegate=amqp://rabbit@127.0.0.1:5672/, localPort= 58614]
import lombok.extern.slf4j.Slf4j;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import pm.mbo.easyway.api.app.order.commands.ConfirmOrderCommand;
import pm.mbo.easyway.api.app.order.commands.PlaceOrderCommand;
import pm.mbo.easyway.api.app.order.commands.ShipOrderCommand;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
@ProcessingGroup("amqpEvents")
@Slf4j
@Aggregate
public class OrderAggregate {
@AggregateIdentifier
private String orderId;
private boolean orderConfirmed;
@CommandHandler
public OrderAggregate(final PlaceOrderCommand command) {
log.debug("command: {}", command);
apply(new OrderPlacedEvent(command.getOrderId(), command.getProduct()));
}
@CommandHandler
public void handle(final ConfirmOrderCommand command) {
log.debug("command: {}", command);
apply(new OrderConfirmedEvent(orderId));
}
@CommandHandler
public void handle(final ShipOrderCommand command) {
log.debug("command: {}", command);
if (!orderConfirmed) {
throw new IllegalStateException("Cannot ship an order which has not been confirmed yet.");
}
apply(new OrderShippedEvent(orderId));
}
@EventSourcingHandler
public void on(final OrderPlacedEvent event) {
log.debug("event: {}", event);
this.orderId = event.getOrderId();
orderConfirmed = false;
}
@EventSourcingHandler
public void on(final OrderConfirmedEvent event) {
log.debug("event: {}", event);
orderConfirmed = true;
}
@EventSourcingHandler
public void on(final OrderShippedEvent event) {
log.debug("event: {}", event);
orderConfirmed = true;
}
protected OrderAggregate() {
}
}
/**
* MessageListener implementation that deserializes incoming messages and forwards them to one or more event processors.
* <p>
* The SpringAMQPMessageSource must be registered with a Spring MessageListenerContainer and forwards each message
* to all subscribed processors.
* <p>
* Note that the Processors must be subscribed before the MessageListenerContainer is started. Otherwise, messages will
* be consumed from the AMQP Queue without any processor processing them.
*
* @author Allard Buijze
* @since 3.0
*/
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.messaging.SubscribableMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
@Slf4j
@Component
public class RabbitMQSpringAMQPMessageSource implements ChannelAwareMessageListener, SubscribableMessageSource<EventMessage<?>> {
private final List<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArrayList<>();
private final AMQPMessageConverter messageConverter;
@Autowired
public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
@Override
public Registration subscribe(final Consumer<List<? extends EventMessage<?>>> messageProcessor) {
eventProcessors.add(messageProcessor);
log.debug("subscribe to: {}", messageProcessor);
return () -> eventProcessors.remove(messageProcessor);
}
@RabbitListener(queues = "${application.queues.in}")
@Override
public void onMessage(final Message message, final Channel channel) {
log.debug("received external message: {}, channel: {}", message, channel);
log.debug("eventProcessors: {}", eventProcessors);
if (!eventProcessors.isEmpty()) {
messageConverter.readAMQPMessage(message.getBody(), message.getMessageProperties().getHeaders())
.ifPresent(event -> eventProcessors.forEach(
ep -> ep.accept(Collections.singletonList(event))
));
}
}
}
eventProcessors: []
@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {
@Autowired
public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
super(messageConverter);
}
@RabbitListener(queues = "${application.queues.in}")
@Override
public void onMessage(final Message message, final Channel channel) {
try {
final var eventProcessorsField = this.getClass().getSuperclass().getDeclaredField("eventProcessors");
eventProcessorsField.setAccessible(true);
final var eventProcessors = (List<Consumer<List<? extends EventMessage<?>>>>) eventProcessorsField.get(this);
log.debug("eventProcessors: {}", eventProcessors);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
log.debug("received message: message={}, channel={}", message, channel);
super.onMessage(message, channel);
}
}
axon:
eventhandling:
processors:
amqpEvents:
source: rabbitMQSpringAMQPMessageSource
mode: SUBSCRIBING
@Autowired
void configure(EventProcessingModule epm,
RabbitMQSpringAMQPMessageSource rabbitMessageSource) {
epm.registerSubscribingEventProcessor("rabbitMQSpringAMQPMessageSource", c -> rabbitMessageSource);
epm.assignProcessingGroup("amqpEvents", "rabbitMQSpringAMQPMessageSource");// this line also made no difference
}
axon:
eventhandling:
processors:
amqpEvents:
source: rabbitMQSpringAMQPMessageSource
mode: SUBSCRIBING
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {
@Autowired
public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
super(messageConverter);
}
@RabbitListener(queues = "${application.queues.in}")
@Override
public void onMessage(final Message message, final Channel channel) {
log.debug("received message: message={}, channel={}", message, channel);
super.onMessage(message, channel);
}
}
import lombok.extern.slf4j.Slf4j;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Service;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@ProcessingGroup("amqpEvents")
@Service
public class OrderedProductsEventHandler {
private final Map<String, OrderedProduct> orderedProducts = new HashMap<>();
@EventHandler
public void on(OrderPlacedEvent event) {
log.debug("event: {}", event);
String orderId = event.getOrderId();
orderedProducts.put(orderId, new OrderedProduct(orderId, event.getProduct()));
}
@EventHandler
public void on(OrderConfirmedEvent event) {
log.debug("event: {}", event);
orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
orderedProduct.setOrderConfirmed();
return orderedProduct;
});
}
@EventHandler
public void on(OrderShippedEvent event) {
log.debug("event: {}", event);
orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
orderedProduct.setOrderShipped();
return orderedProduct;
});
}
@QueryHandler
public List<OrderedProduct> handle(FindAllOrderedProductsQuery query) {
log.debug("query: {}", query);
return new ArrayList<>(orderedProducts.values());
}
}
RabbitMQSpringAMQPMessageSource : received message: ...
OrderedProductsEventHandler : event: OrderShippedEvent...
最佳答案
在 Axon 中,聚合不会从“外部”接收事件。聚合内部的事件处理程序(更具体地说,它们是 EventSourcingHandlers)只处理由同一个聚合实例发布的事件,以便它可以重建其先前的状态。
只有外部事件处理程序,例如更新投影的事件处理程序,才会从外部源接收事件。
为此,您的 application.yml 应提及 bean 名称作为处理器的来源,而不是队列名称。所以在你的第一个例子中:
eventhandling:
processors:
amqpEvents:
source: in.queue
mode: subscribing
eventhandling:
processors:
amqpEvents:
source: inputMessageSource
mode: subscribing
关于java - 使用 Axon 4 从 AMQP 接收事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55769877/
我很想知道在使用 Axon 框架来验证电子邮件字段对于联系人聚合的一组电子邮件是唯一的时,最佳实践方法是什么。 示例设置 ContactCreateCommand { identifier =
我刚开始使用 Axon 框架,但遇到了一些障碍。 虽然我可以使用它们的 ID 加载单个聚合,但我不知道如何获取所有聚合的列表或所有聚合 ID 的列表。 EventSourcingRepository
我是 Axon 的新手,并编写了一个示例代码,其中我们有一个库存服务,用于添加新库存、更新库存,并且我们希望将事件发送到产品服务以进行任何更新。产品服务应充当库存服务的只读副本。 库存服务代码: 所以
是否有必要向 Axon 聚合和消息添加(覆盖)hashCode 和 equals 方法? SonarQube 中的分析表明,在对这些实体执行标准 Axon 操作时(在端到端测试中),它们没有被使用。此
新的一天,新的 Axon 问题。今天我想问一下Axon Saga的流程。在微服务之间使用 Axon Saga 的最佳实践是什么? 异步传奇:“主”服务同时向每个“从”服务发送命令。如果出现问题,也会通
是否可以在没有 Axon Server 的情况下扩展 Axon Framework 企业 ?我有兴趣使用 Axon 创建原型(prototype) CQRS 应用程序,但最终的可部署系统必须免收许可费
问题: 研究:在 https://gitlab.com/ZonZonZon/simple-axon.git我编了一个简单的 Axon-app 表明 JAR-artifact 建于 Gradle插件 c
我们正在使用 Axon 4 的 CQRS 和事件溯源。 我们有以下场景。 域书 操作 - 使用 Axon CRQS 和事件源流(命令 - 聚合 - 事件)在数据库中创建新书籍 操作 - 使用 Axon
在 AxonFramework 中,可以使用消息代理(例如 RabbitMQ)分发事件和命令。 我想知道是否可以对查询执行相同的操作,我在文档上找不到任何内容... 最佳答案 您在文档中找不到它是完全
我们目前正在对CQRS和事件源进行一些研究,并发现了两个主要框架来解决这两个问题:Axon Framework和Eventuate。两者都在继续开发,而Eventuate现在在RBMH reposit
我通常每个聚合有 5-6 个事件,并且不希望将预测存储在数据库中。在查询时进行 View 投影的最简单方法是什么? 最佳答案 对此的简短回答是,没有简单/快速的方法可以做到这一点。 但是,实现“在请求
是否可以使用 usingSubscribingEventProcessors 并且在投影事件时,始终从头开始重新投影所有事件。意思是——我从不将投影保存到数据库,而是在聚合发出新事件时重新投影所有事件
我正在尝试定义我的“事件处理程序拦截器”,我按照官方文档 here 上的说明进行操作,但我收到以下错误: org.springframework.beans.factory.BeanCreationE
我有以下设置: HTTP 请求到达 REST 端点,我在应用程序服务中收到它。 应用程序服务将请求映射到命令 C1 并使用 commandGateway.sendAndWait(new C1(rest
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题吗? 更新问题,以便 editing this post 提供事实和引用来回答它. 关闭 3 年前。 Improve
我正在尝试测试聚合并想断言夹具之外的事件,甚至可能使用 Hamcrest 进行评估? 使用时间戳的例子 fixture.given() .when(new Use
我有一个用例,我想发布一个非状态更改事件作为触发器。 In the vast majority of cases, the Aggregates will publish events by ap
我正在尝试测试聚合并想断言夹具之外的事件,甚至可能使用 Hamcrest 进行评估? 使用时间戳的例子 fixture.given() .when(new Use
我目前正在为 Spring Boot/Axon 应用程序进行集成测试。 在其中一个测试中,它(一个节点)通过 SpringApplicationBuilder 创建另一个节点(在不同的配置文件下),连
到目前为止,我一直在 CommandHandlers 中处理授权。 一个例子是我有一个聚合“团队”,其中包含一个经理列表(来自用户的聚合标识符)。团队聚合中的所有命令处理程序然后验证执行命令的用户是团
我是一名优秀的程序员,十分优秀!