gpt4 book ai didi

java - 不同帐户上的 Spring Boot、JmsListener 和 SQS 队列

转载 作者:太空宇宙 更新时间:2023-11-04 10:39:54 24 4
gpt4 key购买 nike

我正在尝试开发一个 Spring Boot(1.5) 应用程序,该应用程序需要监听来自两个不同 AWS 账户的 SQS 队列。是否可以使用 JmsListener 注解创建监听器?我已检查权限是否正确,我可以使用 getQueueUrl() 获取队列 url,并使用 setQueueOwnerAWSAccountId() 设置正确的帐户 ID。

下面是我用于主帐户下的监听器的代码。尝试将其用于其他帐户上的队列,会出现错误

HTTPStatusCode: 400 AmazonErrorCode: AWS.SimpleQueueService.NonExistentQueue 
com.amazonaws.services.sqs.model.QueueDoesNotExistException: The specified queue does not exist for this wsdl version.

队列读取器类

@Service
public class QueueReader {

@JmsListener(destination = "queue-name")
public void messageReceived(@Payload String message) {
// message received
}
}

队列配置类

@Configuration
@EnableJms
public class QueueReaderConfig {
SQSConnectionFactory connectionFactory = SQSConnectionFactory.builder().withRegion(Region.getRegion(Regions.EU_WEST_1))
.withAWSCredentialsProvider(new DefaultAWSCredentialsProviderChain())
.build();

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(this.connectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("3-10");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}

@Bean
public JmsTemplate defaultJmsTemplate() {
return new JmsTemplate(this.connectionFactory);
}
}

最佳答案

我也遇到了同样的问题。我通过创建自定义 DestinationResolver 并将其设置在“DefaultJmsListenerContainerFactory”和“JmsTemplate”中找到了解决方法。

此外,在“CustomDynamicDestinationResolver”中通过ownerAccountId 查找队列。

queue = ((SQSSession) session).createQueue(queueName, ownerAccountId);

使用连接工厂监听队列。

@JmsListener(destination = "MyQueue", containerFactory = "customJmsListenerContainerFactory")
public void process(String message) throws IOException {

有点晚了,但是,我希望这可以帮助像我这样寻找解决方案的人。

谢谢

阿克谢

import com.amazon.sqs.javamessaging.ProviderConfiguration;
import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazon.sqs.javamessaging.SQSSession;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.util.Assert;

import javax.jms.*;

@Configuration
public class CustomJmsConfig {

private static final Logger LOGGER = LoggerFactory.getLogger(CustomJmsConfig.class);

@Value("${copies.processor.concurrency:5}")
private String concurrency;

@Value("${owner.account.id:1234}")
private String ownerAccountId;

SQSConnectionFactory customConnectionFactory =
new SQSConnectionFactory(
new ProviderConfiguration(),
AmazonSQSClientBuilder.standard().withRegion(Regions.EU_CENTRAL_1).withCredentials(new DefaultAWSCredentialsProviderChain())
);

@Bean
public DefaultJmsListenerContainerFactory customJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(this.customConnectionFactory);
factory.setDestinationResolver(new CustomDynamicDestinationResolver(ownerAccountId));
factory.setConcurrency(concurrency);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}

@Bean
public JmsTemplate customJmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(this.customConnectionFactory);
jmsTemplate.setDestinationResolver(new CustomDynamicDestinationResolver(ownerAccountId));
return jmsTemplate;
}

public static class CustomDynamicDestinationResolver implements DestinationResolver {

private String ownerAccountId;

public CustomDynamicDestinationResolver(String ownerAccountId) {
this.ownerAccountId = ownerAccountId;
}

@Override
public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
Assert.notNull(session, "Session must not be null");
Assert.notNull(destinationName, "Destination name must not be null");
if (pubSubDomain) {
return resolveTopic(session, destinationName);
} else {
return resolveQueue(session, destinationName);
}
}

protected Topic resolveTopic(Session session, String topicName) throws JMSException {
return session.createTopic(topicName);
}

protected Queue resolveQueue(Session session, String queueName) throws JMSException {
Queue queue;
LOGGER.info("Getting destination for libraryOwnerAccountId: {}, queueName: {}", libraryOwnerAccountId, queueName);
if (libraryOwnerAccountId != null && session instanceof SQSSession) {
queue = ((SQSSession) session).createQueue(queueName, ownerAccountId);
} else {
queue = session.createQueue(queueName);
}
return queue;
}
}
}

关于java - 不同帐户上的 Spring Boot、JmsListener 和 SQS 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49104384/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com