gpt4 book ai didi

java - Spring Boot 中具有不同凭证的多个 AWS SQS 队列

转载 作者:行者123 更新时间:2023-11-30 10:00:20 30 4
gpt4 key购买 nike

我有一个 Spring Boot 应用程序,想从多个 AWS SQS 队列接收消息。这些队列都有自己的凭据(遗憾的是我对此无能为力)。这些凭据都不能访问其他队列之一,它们都仅限于一个队列。

只有一个队列和凭证,很简单。我只需提供作为 AWSCredentialsProvider Bean 的凭据,并使用 @SqsListener\@EnableSqs 注释我的方法。
但我不知道如何使用多个凭据来做到这一点。

@SqsListener 注释无法提供凭证、预配置的 AmazonSqs 对象或任何其他有帮助的东西。

我通过扩展 CredentialsProviderAmazonSqs 客户端搜索了一种将队列映射到凭据的方法,但无济于事。
我什至尝试在 AmazonHttpClient 的 header 中注入(inject)凭据,但这也是不可能的。

我尝试创建手动监听 SQS 队列所需的一切。但我坚持为 SimpleMessageListenerContainer 创建 MessageHandler。
所需的 QueueMessageHandler 仅在创建为具有应用程序上下文的 bean 时才有效。否则它不会查找用 @SqsListener 注释的方法。
可悲的是,我能找到的唯一教程或示例要么使用 JMS,我想避免,要么只使用一个队列的 @SqsListener 注释。

有没有其他方法可以为多个队列提供不同的凭据?

我的测试代码:

@Component
@Slf4j
public class TestOneQueueA {

public static final String QUEUE_A = "TestOneQueueA";

public TestOneQueueA(Cloud cloud, ResourceIdResolver resourceIdResolver) {
SqsServiceInfo serviceInfo = (SqsServiceInfo) cloud.getServiceInfo(QUEUE_A);
AWSStaticCredentialsProvider credentialsProvider =
new AWSStaticCredentialsProvider(new BasicAWSCredentials(serviceInfo.getAccessKey(),
serviceInfo.getSecretAccessKey()));

AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(serviceInfo.getRegion()).build();

QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(client);
queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(new MappingJackson2MessageConverter()));

QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
queueMessageHandler.afterPropertiesSet(); // won't do anything because of no ApplicationContext

SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(client);
factory.setResourceIdResolver(resourceIdResolver);
factory.setQueueMessageHandler(queueMessageHandler);

SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
try {
simpleMessageListenerContainer.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
simpleMessageListenerContainer.start();
simpleMessageListenerContainer.start(QUEUE_A); // fails with "Queue with name 'TestOneQueueA' does not exist"
}

@SqsListener(value = QUEUE_A, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
log.info("Received SQS Message: \nSubject: %s \n%s", subject, dto);
}
}

编辑:

在尝试更多之后,我能够将我的 AmazonSQS 客户端注入(inject)到两个单独的 SimpleMessageListenerContainer 中。然后问题就变成了 QueueMessageHandler

如果我在没有 bean 上下文的情况下手动创建它,它根本不会查找任何带有 @SqsListener 注释的方法。并且无法手动设置处理程序。
如果我将它创建为 bean,它将查看每个 bean 的注解。所以它也会找到它不应该寻找的队列的方法。然后它将崩溃,因为凭据不起作用。
我想不出一种方法来为单个 SqsListener 方法创建 QueueMessageHandler
并且 SimpleMessageListenerContainer 除了 QueueMessageHandler 之外不会接受任何东西。

最佳答案

在花了一些时间寻找更好的解决方案后,我坚持使用以下方法:

package test;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.AwsRegionProvider;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import test.TestDto;
import test.CustomQueueMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationSubject;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;

@Component
public class TestQueue {

private static final String QUEUE_NAME = "TestQueue";
private static final Logger log = LoggerFactory.getLogger(TestQueue.class);

public TestQueue(AWSCredentialsProvider credentialsProvider, AwsRegionProvider regionProvider) {
AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(regionProvider.getRegion())
.build();

// custom QueueMessageHandler to initialize only this queue
CustomQueueMessageHandler queueMessageHandler = new CustomQueueMessageHandler();
queueMessageHandler.init(this);
queueMessageHandler.afterPropertiesSet();

SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(client);
factory.setQueueMessageHandler(queueMessageHandler);

SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
try {
simpleMessageListenerContainer.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
simpleMessageListenerContainer.start();
}

@SqsListener(value = QUEUE_NAME, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
log.info("Received SQS Message: \nSubject: {} \n{}", subject, dto);
}
}

和自定义QueueMessageHandler:

package test;

import java.util.Collections;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;

public class CustomQueueMessageHandler extends QueueMessageHandler {

public void init(Object handler) {
detectHandlerMethods(handler);
}
}

CustomQueueMessageHandler 的唯一目的是将单个对象传递到它应该扫描 SQS 注释的位置。因为我不使用 Spring 上下文启动它,所以它不会在每个 bean 中搜索 @SqsListener 注释。但是所有的初始化都隐藏在 protected 方法后面。这就是为什么我需要覆盖该类以访问这些 init 方法的原因。

我不认为这是一个非常优雅的解决方案,手动创建所有 AWS 客户端内容,并调用 bean init 方法。但这是我能找到的唯一仍然可以访问 AWS SQS 库所有功能的解决方案,例如转换传入消息和通知、删除策略、队列轮询(包括故障处理)等。

关于java - Spring Boot 中具有不同凭证的多个 AWS SQS 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58064573/

30 4 0
文章推荐: Java 溢出在计算器上计算与 IDE 结果有不同的结果?
文章推荐: javascript - 使用 jquery 的 forEach 和 templatePlugin 追加
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com