- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
因此,我在 spring jms 50-100 中使用并发,允许最大连接数高达 200。一切都按预期工作,但如果我尝试从队列中检索 100k 条消息,我的意思是我的 sqs 上有 100k 条消息,我正在读取它们通过spring jms正常方式。
@JmsListener
Public void process (String message) {
count++;
Println (count);
//code
}
我在我的控制台中看到了所有日志,但在大约 17k 之后它开始抛出异常
类似于:aws sdk 异常:端口已在使用中。
为什么我会看到这个异常以及如何做。我摆脱它?
我试着在互联网上寻找它。找不到任何东西。
我的设置:
并发 50-100
为每个任务设置消息数:50
客户确认
timestamp=10:27:57.183, level=WARN , logger=c.a.s.j.SQSMessageConsumerPrefetch, message={ConsumerPrefetchThread-30} Encountered exception during receive in ConsumerPrefetch thread,
javax.jms.JMSException: AmazonClientException: receiveMessage.
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:422)
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:339)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:248)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:207)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Address already in use: connect
更新:我查找了问题,似乎一直在创建新的套接字,直到每个套接字都用完为止。
我的 spring jms 版本是 4.3.10
要重现此问题,只需将最大连接数设置为 200,并将货币设置为 50-100,然后将一些 40k 消息推送到 sqs 队列即可进行上述配置。可以使用 https://github.com/adamw/elasticmq这是一个复制 Amazon sqs 的本地堆栈服务器。完成后到这里。评论 jms 监听器并使用 soap ui 负载测试并调用发送消息以触发许多消息。仅仅因为你注释了@jmslistener 注解,它就不会从队列中消费消息。一旦您看到您已经发送了 40k 条消息,请停止。取消注释@jmslistener 并重新启动服务器。
更新:
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setErrorHandler(Throwable::printStackTrace);
factory.setConcurrency("50-100");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
更新:
SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
更新:
客户端配置详情:
Protocol : HTTP
Max connections : 200
更新:
我似乎使用了缓存连接工厂类。我在堆栈溢出和他们的官方文档中读到不使用缓存连接工厂类和默认的 jms 监听器容器工厂。
https://stackoverflow.com/a/21989895/5871514
它给出了我之前得到的相同错误。
更新
我的目标是获得 500 tps,即我应该能够消耗那么多。所以我尝试了这个方法,似乎我可以达到 100-200,但不会超过那个。加上这个东西是高并发阻塞..如果你使用它..如果你有更好的解决方案来实现它..我洗耳恭听。
**已更新**
我正在使用 amazonsqsclient
最佳答案
消费者的饥饿感
JMS 客户端倾向于实现的一种可能的优化是消息消费缓冲区或“预取”。该缓冲区有时可通过消息数量或缓冲区大小(以字节为单位)进行调整。
这样做的目的是防止消费者每次收到消息都去服务器,而不是批量拉取多条消息。
在您拥有许多“快速消费者”(这是这些库可能采取的固执己见的观点)的环境中,此预取设置为较高的默认值,以最大程度地减少这些往返。
但是,在消息消费者速度较慢的环境中,这种预取可能会成为一个问题。慢速消费者正在阻止来自更快消费者的那些预取消息的消息消费。在高度并发的环境中,这会很快导致饥饿。
在这种情况下,SQSConnectionFactory
有一个 property for this :
SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
sqsConnectionFactory.setNumberOfMessagesToPrefetch(0);
生产者饥饿(即通过 JmsTemplate
)
对于这些 JMS 实现来说,期望通过一些中介与代理接口(interface)是很常见的。这些中介实际上缓存和重用连接或使用池机制重用它们。在 Java EE 世界中,这通常由 JCA 适配器或 Java EE 服务器上的其他方法处理。
由于 Spring JMS 的工作方式,它期望 ConnectionFactory
存在一个中间委托(delegate)来执行此缓存/池化。否则,当 Spring JMS 想要连接到代理时,它会尝试打开一个新的连接和 session (!)每次你想对代理做一些事情。
为了解决这个问题,Spring 提供了一些选项。最简单的是 CachingConnectionFactory
,它缓存单个 Connection
,并允许在该 Connection
上打开多个 Session
。将其添加到上面的 @Configuration
的简单方法如下:
@Bean
public ConnectionFactory connectionFactory(AmazonSQSClient amazonSQSclient) {
SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), amazonSQSclient);
// Doing the following is key!
CachingConnectionFactory connectionfactory = new CachingConnectionFactory();
connectionfactory.setTargetConnectionFactory(sqsConnectionFactory);
// Set the #connectionfactory properties to your liking here...
return connectionFactory;
}
如果你想要一些更奇特的东西作为 JMS 池解决方案(除了多个 Session
之外,它将为你池 Connections
和 MessageProducer
s),你可以使用相当新的 PooledJMS project的 JmsPoolConnectionFactory
等,来自他们的库。
关于java - 使用 AmazonSQSClient 的消息消耗缓慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53727538/
我有一个简单的 HATEOAS 使用 构建的提供程序 Spring 这为我提供了以下资源: { "_links" : { "self" : { "href" : "http:/
这里是 Clojure 初学者,不确定问题中的术语是否正确。 我正在使用 clj-webdriver 出租车 API 构建网络抓取工具。它需要从多个站点抓取数据。以下实际上不是项目中的代码,但我已经对
我使用pthread lib 2.8,操作系统内核是arm上的Linux 2.6.37。在我的程序中,线程 A 使用 pthread 接口(interface)将调度优先级设置为 sched_get_
我有一个大约 400MB 的二进制文件,我想将其转换为 CSV 格式。输出的 CSV 文件将约为 1GB(根据我的计算)。 我读取二进制文件并将其存储在一个结构数组中(其他处理也需要),当用户想要将其
我在编写我的专业应用程序时遇到一个串口线程问题。我有cpu消耗。当我在我的项目中添加 SerialCtrl.h(来自项目 SerialCtrl http://www.codeproject.com/A
总结:似乎 c 代码的 RAM 消耗取决于变量排序。有没有办法自动优化? 更长的版本:在这里,我粘贴了两个版本的代码,它们仅在变量排序方面有所不同。 版本 1: static unsigned lon
我有一个处理图像编辑(裁剪和调整大小)的 Windows 应用程序项目。不幸的是,这些图像处理会消耗大量内存和 CPU 资源(很容易达到 600MB 或 50% cpu),而且它只是裁剪和调整大小 2
我创建了一个实例化类 10 亿次的循环,并且非常惊讶地看到它在 0 毫秒内运行并且根据 Windows 任务管理器没有消耗 CPU 时间。 正如您从下面的代码中看到的那样,我显然没有对默认构造函数执行
我们有以下用户名验证规则: 用户名可以包含字母数字字符 用户名可以有下划线、连字符或句号 现在假设用户名是 ASCII 用户名不能以句点开头或结尾 用户名不能开始、结束或有任何空格 我们有以下相同的正
如何获取 C# 中所有进程的列表,然后获取每个进程的当前内存和 CPU 消耗? 非常感谢示例代码。 最佳答案 Process class有一个 GetProcesses 方法,可以让您枚举正在运行的进
如何从 linux 2.6.32 机器上的源代码中限制 C 程序的物理内存消耗? 我需要确定系统使用的页面替换算法的类型。 问题是,如果不限制一个进程在内存中可以拥有的页面数量,就很难分析页面错误的模
我正在编写一个 Linux 应用程序,它观察其他应用程序并跟踪资源消耗。我计划使用 Java,但编程语言对我来说并不重要。目标很重要,所以我可以切换到另一种技术或使用模块。我的应用程序将任何选定的第三
我有一个图标,旁边有一个复选框,包含在一个面板中。面板有悬停效果,点击面板时想选中框。 我想使用或阻止复选框的所有事件,仅以编程方式选择它。我希望该框在屏幕上显示为“已启用”,而“在幕后”几乎不起作用
我正在使用服务 REST,它当前有 5025 条记录,但当我使用该服务时,只出现 1,000 条记录。我可以做什么来完全消耗所有记录? 这是我的代码示例: $http({ method:
我已经搜索了几个小时但没有成功,是否可以跟踪 Blob 容器上 SAS 凭据的消耗情况? 我将向几个客户提供 SAS 凭证,并且我希望能够跟踪他们的 SAS 使用情况(操作次数、带宽使用情况...)
我创建了具有 1.75 GB RAM 的 B1 应用服务计划。我还创建了一个应用程序服务并向其部署了 docker 镜像。现在我停止了 docker 镜像,它的状态已停止,并且它是该应用程序服务计划中
与我们的合作伙伴一起为我们的客户(一家售后汽车零部件零售商)开发一个项目,他们利用 AR 和 VR 做了很多很酷的事情。我们的想法是使用 Hololens 并尝试帮助我们的客户在其大型仓库中进行物流操
这是一个简短的 Haskell 程序,可以生成 440 Hz 的声音。它使用pulseaudio作为音频后端。 import GHC.Float import Control.Arrow import
关闭。这个问题需要多问focused 。目前不接受答案。 想要改进此问题吗?更新问题,使其仅关注一个问题 editing this post . 已关闭 8 年前。 Improve this ques
tab.setOnCloseRequest(e -> { if (getEditorForTextArea(getSelectedTextArea()) != null
我是一名优秀的程序员,十分优秀!