gpt4 book ai didi

jakarta-ee - 在 @MessageDriven bean 中使用 amazon sqs - 池化/并行处理

转载 作者:行者123 更新时间:2023-12-03 10:01:42 26 4
gpt4 key购买 nike

我们需要在 Java EE 应用程序中使用队列,并且由于它是一个基于云的应用程序(部署在 OpenShift Online 上),我们喜欢使用 amazon sqs。

如果我正确理解了 JMS/Java EE 接收部分的理论,则 @MessageDriven bean 由 Java EE 容器管理,因此如果传入消息的数量很高,则会并行创建许多 bean 实例(根据最大池大小)。这对于处理高负载当然是一个很大的好处。

但是,我不知道如何在 Java EE 应用程序中以这种方式集成 aws sqs。我知道来自 http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html 的异步接收器示例:

class MyListener implements MessageListener {

@Override
public void onMessage(Message message) {
try {
// Cast the received message as TextMessage and print the text to screen.
if (message != null) {
System.out.println("Received: " + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

进而:
// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);

// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());

// Start receiving incoming messages.
connection.start();

这是官方的异步接收器示例 - 这不是 @MessageDriven bean 角,扁 bean 。很明显,我们需要在某个地方使用凭据进行身份验证(通过创建 SQSConnectionFactory,然后是连接,然后是 session - 这也在示例中得到了很好的描述)。
但我强烈认为这个例子不会并行处理消息——即只有一个 bean 实例正在处理队列,这对于可扩展、高负载的应用程序来说不是一个好的解决方案。

a) 我们如何使用 Amazon SQS 走真正的 Java EE 之路?
我只是找到了一棵 Spring 的例子。但它必须是 Java EE 7。

b) 我们使用 Wildfly(目前是 8.2.1)。是否也可以让 Wildfly 在内部管理与 AWS 和应用程序的连接,我们可以像使用应用程序服务器管理的队列一样使用队列(与用于数据库访问的数据源相同的方法)?

从 stdunbar 得到答案后的结论 :
我喜欢做的事情似乎不可能以“适当的方式”进行。所以我该怎么做?实现 ManagedExecutorService正如 stdunbar 所描述的那样“包装”队列? - 然而,这意味着也有一个本地队列,这对于应用程序来说不是一个好情况,它应该是可扩展的?!
什么是替代品?我们正在 OpenShift Online 上运行该应用程序。用例如实例化自己的装备可能会更好。 ApacheMQ Cartridge...当然有很多缺点,比如成本和我们负责“基础设施”。

老实说,在这种情况下,我对 AWS 真的很失望......

最佳答案

我不认为我的解决方案是合适的 JAVA EE,但在我的情况下它有效。

配置:

@Singleton
public class SqsMessageManager
{
private Integer numberOfReceivers = 3;

public static SQSConnection connection = null;
public static Queue queue = null;

@Inject
SqsMessageReceiver sqsMessageReceiver;

public void init()
{
try
{
SQSConnectionFactory connectionFactory =
SQSConnectionFactory.builder()
.withRegion(Region.getRegion(Regions.EU_WEST_1))
.withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider())
.build();

connection = connectionFactory.createConnection();

queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue");

for (int i = 0; i < numberOfReceivers; i++)
connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver);

connection.start();
}
catch (JMSException e)
{
e.getStackTrace();
}
}
}

然后发件人:
@Dependent
public class SqsMessageSender
{
MessageProducer producer = null;
Session senderSession = null;

@PostConstruct
public void createProducer(){
try
{
// open new session and message producer
senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = senderSession.createProducer(SqsMessageManager.queue);
}catch(JMSException | NullPointerException e){
;
}
}

@PreDestroy
public void destroy(){
try
{
// close session
producer.close();
senderSession.close();
}catch(JMSException e){

}
}

// sends a message to aws sqs queue
public void sendMessage(String txt)
{
try
{
TextMessage textMessage = senderSession.createTextMessage(txt);
producer.send(textMessage);
}
catch (JMSException e)
{
e.getStackTrace();
}
}
}

和接收器:
@Dependent
public class SqsMessageReceiver implements MessageListener
{
public void onMessage(Message inMessage) {
...
}
}

关于jakarta-ee - 在 @MessageDriven bean 中使用 amazon sqs - 池化/并行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41171902/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com