gpt4 book ai didi

java - Spring amqp——异步请求/响应场景下的消息关联

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:16:18 25 4
gpt4 key购买 nike

目前我开发了一个小型爬虫/抓取应用程序,它分为两部分。第一部分(请求者)基于某种模式构建 URL 并将其发送给消费者(第二部分),后者进行实际抓取并返回结果。然后对结果进行进一步处理,并可能将其发送到另一个队列等。

所以我认为使用 spring-amqp 项目的消息传递和 RabbitMQ 消息传递服务来对所有请求进行排队是个好主意。这样我就可以设置多个消费者,它们都监听请求队列。现在我使用 convertAndSend 从请求者发出异步请求

@Autowired
private RabbitTemplate messagingTemplate;

public void doMessaging(String url){
...
messagingTemplate.convertAndSend(queueName, url);

}

并在 MessageListener 中接收结果,它监听 Response 队列。到目前为止一切正常,但我的问题是请求者的发送部分和接收部分是独立的,我不知道如何确保我得到每个请求消息的响应(就像我在使用 convertSendAndReceive 时可以做到的那样)。

所以我的问题是:这个问题是否有任何好的模式,或者我是否必须缓存所有带有一些 correlationId 的请求并手动检查响应是否到达?

enter image description here

更新:

基本上我正在寻找一般模式,因为 spring-amqp 提供的 convertAndSend() 的异步性很好。

我的场景更详细:我在图表中称为“请求者”的组件从数据库中读取一些可配置的值。基于这些信息,它创建了一个对象,可以说是一个“爬行”,其中包含有关爬行 Activity 的信息。在“阶段 1”开始之后,请求者根据可配置的值构建 URL 并将它们发送到 queue1。 Queue1 消费者废弃网页并将响应发送到 response-queue1。

请求者监听 response-queue1,获取结果并将新消息推送到 request-queue2(“阶段 2”)等...

我的问题是:我怎么知道例如“爬行运行”的所有阶段都已完成/完成以存储有关此运行的一些元数据并关闭它?

一个想法可能是,为每个请求消息创建一个 correlationId,将其存储在映射中,并在具有相同 correlationId 的响应到达时删除该条目,但我认为这可能不是最佳实践。是否有任何建议的解决方案或模式来处理这种异步请求响应场景?

最佳答案

Spring Integration Project拥有您需要的所有组件。

确切的实现将取决于您的要求——您可以使用屏障来暂停调用线程直到收到回复,或者使用聚合器进行完全异步处理,如果没有收到回复,聚合器可以发送错误消息,例如。

如果您可以提供有关您的确切要求的更多详细信息,我们可以帮助您开始。

如果出于某种原因不想使用 Spring Integration,可以使用模板的 convertAndSend 变体,它采用 MessagePostProcessor,这样您就可以设置关联 ID。

您可以使用 @RabbitListener 接收回复,其中 POJO 方法获取转换后的有效负载和相关 ID header 。

关于java - Spring amqp——异步请求/响应场景下的消息关联,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34859353/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com