gpt4 book ai didi

Memory leak in Apache ActiveMQ(ApacheActiveMQ中的内存泄漏)

转载 作者:bug小助手 更新时间:2023-10-24 19:38:32 24 4
gpt4 key购买 nike



I'm new to Apache ActiveMQ and have some trouble with sending and storing big amount of big messages (biggest ones are around 100mb) in ActiveMQ. Messages are persistent, so as I know they store at the hard drive, not in memory. But strangely the ActiveMQ crashes when the database size (KahaDB folder) reaches 2.8Gb (70% of 4Gb jvm heap). Seems like ActiveMQ still store all messages in memory. I need firstly to store all messages, then I will be consuming them all.

我是新接触ApacheActiveMQ的,在ActiveMQ中发送和存储大量大消息(最大的消息大约是100MB)时遇到了一些问题。消息是持久的,所以据我所知,它们存储在硬盘上,而不是内存中。但奇怪的是,当数据库大小(KahaDB文件夹)达到2.8 GB(占4 GB JVM堆的70%)时,ActiveMQ崩溃。似乎ActiveMQ仍然将所有消息存储在内存中。我首先需要存储所有消息,然后我将使用它们。



My config activemq.xml

我的配置Actiemq.xml



<policyEntry queue=">" producerFlowControl="false" memoryLimit="100MB">
<pendingQueuePolicy>
<fileQueueCursor />
</pendingQueuePolicy>
</policyEntry>
...
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
...
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=204857600"/>
</transportConnectors>


And that how I send messages in java. Try-with-resources closes all.

这就是我在Java中发送消息的方式。Try-with-Resources关闭所有。



try (Connection conn = queueConnFactory.createConnection();
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(getTargetQueue())) {

ObjectMessage message = session.createObjectMessage(transferMessage);
producer.send(message);
}


And this is activeMQ log when it crashes last time.

这是上次崩溃时的active MQ日志。



ERROR | Forwarding of acks failed | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
java.lang.OutOfMemoryError: Java heap space
2019-08-02 20:32:12,421 | INFO | Ignoring no space left exception, java.io.IOException: Java heap space | org.apache.activemq.util.DefaultIOExceptionHandler | ActiveMQ Journal Checkpoint Worker
java.io.IOException: Java heap space
at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:40)[activemq-client-5.15.9.jar:5.15.9]
at org.apache.activemq.store.kahadb.MessageDatabase$AckCompactionRunner.run(MessageDatabase.java:2075)[activemq-kahadb-store-5.15.9.jar:5.15.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)[:1.8.0_211]
at java.util.concurrent.FutureTask.run(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)[:1.8.0_211]
at java.lang.Thread.run(Unknown Source)[:1.8.0_211]
2019-08-02 20:37:12,514 | WARN | Async error occurred: java.lang.OutOfMemoryError: Java heap space | org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: tcp:///10.11.34.224:58668@61616
2019-08-02 20:37:44,899 | ERROR | Forwarding of acks failed | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
java.lang.IllegalArgumentException: Self-suppression not permitted
at java.lang.Throwable.addSuppressed(Unknown Source)[:1.8.0_211]
at org.apache.activemq.store.kahadb.MessageDatabase.forwardAllAcks(MessageDatabase.java:2132)[activemq-kahadb-store-5.15.9.jar:5.15.9]
at org.apache.activemq.store.kahadb.MessageDatabase.access$700(MessageDatabase.java:121)[activemq-kahadb-store-5.15.9.jar:5.15.9]
at org.apache.activemq.store.kahadb.MessageDatabase$AckCompactionRunner.run(MessageDatabase.java:2068)[activemq-kahadb-store-5.15.9.jar:5.15.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)[:1.8.0_211]
at java.util.concurrent.FutureTask.run(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)[:1.8.0_211]
at java.lang.Thread.run(Unknown Source)[:1.8.0_211]
2019-08-02 20:37:44,899 | INFO | Stopping BrokerService[localhost] due to exception, java.io.IOException: Self-suppression not permitted | org.apache.activemq.util.DefaultIOExceptionHandler | ActiveMQ Journal Checkpoint Worker
java.io.IOException: Self-suppression not permitted
at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:40)[activemq-client-5.15.9.jar:5.15.9]
at org.apache.activemq.store.kahadb.MessageDatabase$AckCompactionRunner.run(MessageDatabase.java:2075)[activemq-kahadb-store-5.15.9.jar:5.15.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)[:1.8.0_211]
at java.util.concurrent.FutureTask.run(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)[:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)[:1.8.0_211]
at java.lang.Thread.run(Unknown Source)[:1.8.0_211]
2019-08-02 20:37:44,946 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:PVAH-WF-58622-1564761931309-0:1) is shutting down | org.apache.activemq.broker.BrokerService | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:45,133 | INFO | Connector openwire stopped | org.apache.activemq.broker.TransportConnector | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:45,180 | INFO | Connector amqp stopped | org.apache.activemq.broker.TransportConnector | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:46,974 | INFO | Connector stomp stopped | org.apache.activemq.broker.TransportConnector | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:46,974 | INFO | Connector mqtt stopped | org.apache.activemq.broker.TransportConnector | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:48,316 | INFO | Connector ws stopped | org.apache.activemq.broker.TransportConnector | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:48,955 | INFO | PListStore:[C:\ActiveMQ\bin\win64\..\..\data\localhost\tmp_storage] stopped | org.apache.activemq.store.kahadb.plist.PListStoreImpl | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:49,002 | INFO | Stopping async queue tasks | org.apache.activemq.store.kahadb.KahaDBStore | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:49,002 | INFO | Stopping async topic tasks | org.apache.activemq.store.kahadb.KahaDBStore | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:49,002 | INFO | Stopped KahaDB | org.apache.activemq.store.kahadb.KahaDBStore | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:51,404 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:PVAH-WF-58622-1564761931309-0:1) uptime 1 hour 32 minutes | org.apache.activemq.broker.BrokerService | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:58,502 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:PVAH-WF-58622-1564761931309-0:1) is shutdown | org.apache.activemq.broker.BrokerService | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:37:58,534 | INFO | Closing org.apache.activemq.xbean.XBeanBrokerFactory$1@14d57a4: startup date [Fri Aug 02 19:05:26 MSK 2019]; root of context hierarchy | org.apache.activemq.xbean.XBeanBrokerFactory$1 | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:38:00,296 | INFO | Destroying Spring FrameworkServlet 'dispatcher' | /admin | IOExceptionHandler: stopping BrokerService[localhost]
2019-08-02 20:38:00,764 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@71d9564d: startup date [Fri Aug 02 20:38:00 MSK 2019]; root of context hierarchy | org.apache.activemq.xbean.XBeanBrokerFactory$1 | WrapperSimpleAppMain
2019-08-02 20:38:03,619 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\ActiveMQ\bin\win64\..\..\data\kahadb] | org.apache.activemq.broker.BrokerService | WrapperSimpleAppMain
2019-08-02 20:38:04,196 | INFO | ignoring zero length, partially initialised journal data file: db-404.log number = 404 , length = 0 | org.apache.activemq.store.kahadb.disk.journal.Journal | WrapperSimpleAppMain
2019-08-02 20:38:05,819 | INFO | KahaDB is version 6 | org.apache.activemq.store.kahadb.MessageDatabase | WrapperSimpleAppMain
2019-08-02 20:38:06,177 | INFO | PListStore:[C:\ActiveMQ\bin\win64\..\..\data\localhost\tmp_storage] started | org.apache.activemq.store.kahadb.plist.PListStoreImpl | WrapperSimpleAppMain
2019-08-02 20:38:06,177 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:PVAH-WF-58622-1564761931309-0:2) is starting | org.apache.activemq.broker.BrokerService | WrapperSimpleAppMain
2019-08-02 20:38:10,374 | INFO | PListStore:[C:\ActiveMQ\bin\win64\..\..\data\localhost\tmp_storage] initialized | org.apache.activemq.store.kahadb.plist.PListStoreImpl | WrapperSimpleAppMain
2019-08-02 20:38:26,925 | WARN | Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.lang.OutOfMemoryError: Java heap space | org.apache.activemq.xbean.XBeanBrokerFactory$1 | WrapperSimpleAppMain
2019-08-02 20:38:26,925 | ERROR | Failed to load: class path resource [activemq.xml], reason: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.lang.OutOfMemoryError: Java heap space | org.apache.activemq.xbean.XBeanBrokerFactory | WrapperSimpleAppMain
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.lang.OutOfMemoryError: Java heap space
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1634)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:312)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:308)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761)[spring-beans-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)[spring-context-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543)[spring-context-4.3.18.RELEASE.jar:4.3.18.RELEASE]
at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)[xbean-spring-4.2.jar:4.2]
at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)[xbean-spring-4.2.jar:4.2]
at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104)[activemq-spring-5.15.9.jar:5.15.9]
at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104)[activemq-spring-5.15.9.jar:5.15.9]
at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67)[activemq-spring-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87)[activemq-console-5.15.9.jar:5.15.9]
at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)[activemq-console-5.15.9.jar:5.15.9]
at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154)[activemq-console-5.15.9.jar:5.15.9]
at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)[activemq-console-5.15.9.jar:5.15.9]
at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104)[activemq-console-5.15.9.jar:5.15.9]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_211]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)[:1.8.0_211]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)[:1.8.0_211]
at java.lang.reflect.Method.invoke(Unknown Source)[:1.8.0_211]
at org.apache.activemq.console.Main.runTaskClass(Main.java:262)[activemq.jar:5.15.9]
at org.apache.activemq.console.Main.main(Main.java:115)[activemq.jar:5.15.9]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_211]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)[:1.8.0_211]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)[:1.8.0_211]
at java.lang.reflect.Method.invoke(Unknown Source)[:1.8.0_211]
at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240)[wrapper.jar:3.2.3]
at java.lang.Thread.run(Unknown Source)[:1.8.0_211]

更多回答

ActiveMQ do not GC'ed inactive destinations by default. If you have many of them, it can cause OOMError. Try to set policy parameter GcInactiveDestinations = true as mentioned here svn.apache.org/repos/infra/websites/production/activemq/content/…

默认情况下,ActiveMQ不会对非活动目标进行GC。如果你有很多这样的错误,它可能会导致OOMError。尝试设置策略参数GcInactive目标=True,如此处所述svn.apache.org/repos/infra/websites/production/activemq/content/…

No, I have only 5 queue destinations, that store for now ~100-200 messages each. Then activeMQ crushes. GcInactiveDestinations = true seems like just entirely delete empty destinations.

不,我只有5个队列目的地,现在每个目的地存储大约100-200条消息。然后,active MQ就崩溃了。GcInactiveDestination=TRUE看起来就像是完全删除空的目的地。

优秀答案推荐

At this point I don't see any evidence of a memory leak. Just because the broker ran out of memory doesn't mean there is a leak. You could simply not be giving the JVM enough heap for your use-case.

在这一点上,我没有看到任何内存泄漏的证据。仅仅因为代理内存用完并不意味着存在泄漏。您可能没有为JVM提供足够的堆来满足您的用例。


Also, just because the broker uses a database of sorts (i.e. KahaDB) to store the messages to disk doesn't mean the broker is much like a database. The broker will still store as much message data in memory as it can despite the fact that messages are persisted to disk contrary to your assertion. It would be extremely inefficient to purge message data from memory once it's written to disk then immediately re-load it from disk as soon as a consumer needs it. There is also the matter of non-persistent messages that the broker must track without writing them to disk.

此外,仅仅因为代理使用某种数据库(即KahaDB)将消息存储到磁盘并不意味着代理很像数据库。代理仍将在内存中存储尽可能多的消息数据,尽管与您的断言相反,消息被持久化到磁盘。将消息数据写入磁盘后立即从内存中清除,然后在消费者需要时立即从磁盘重新加载消息数据,这将是非常低效的。还有一个问题是,代理必须在不将消息写入磁盘的情况下跟踪非持久消息。


If you're just going to be storing all the data and then retrieving it later it might be best for you to simply use a database as a database would be better suited to such a use-case. A message broker is usually better suited to use cases where consumers are active with producers concurrently and messages don't build up on the broker but are dispatched to consumers soon after they arrive.

如果您只是要存储所有数据,然后在以后检索它,那么对您来说,最好的方法可能是简单地使用一个数据库,因为它更适合这种用例。消息代理通常更适合于这样的用例:消费者与生产者同时处于活动状态,并且消息不会在代理上积累,但在到达后不久就会被分派给消费者。


Your code snippet is potentially an anti-pattern. JMS connections are thread-safe and are meant to be re-used. Opening and closing a connection for every message sent is an anti-pattern if, in fact, the application does any other JMS work aside from sending a single, solitary message.

您的代码片段可能是反模式的。JMS连接是线程安全的,可以重复使用。为发送的每条消息打开和关闭连接是一种反模式,如果应用程序实际上除了发送一条单独的消息外还执行任何其他JMS工作。


Lastly, you may consider moving to ActiveMQ Artemis (i.e. the next generation of ActiveMQ) as it has direct support for "large" messages without requiring the use of an out-of-band transfer mechanism or implementation objects as required with the "blob message" support in ActiveMQ "Classic."

最后,您可以考虑迁移到ActiveMQ Artemis(即下一代ActiveMQ),因为它直接支持“大型”消息,而不需要使用带外传输机制或实现对象,而ActiveMQ“Classic”中的“BLOB消息”支持是必需的。


更多回答

The last statement is incorrect-- ActiveMQ 5.x supports large message: activemq.apache.org/blob-messages

最后一条语句不正确--ActiveMQ 5.x支持大消息:active emq.apache.org/BLOB-Messages

I updated my answer to clarify the differences. Hope that helps!

我更新了我的答案,以澄清差异。希望这能有所帮助!

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com