gpt4 book ai didi

java - ActiveMQ 消息出队但未使用

转载 作者:行者123 更新时间:2023-11-30 03:03:48 26 4
gpt4 key购买 nike

我有一个 JBoss Web 应用程序,当前正在使用嵌入式 HornetQ for JMS。我们想要切换到 ActiveMQ HA 集群,但我遇到了一些奇怪的问题。我的队列之一(periodicDerivationQueue)的行为与 HornetQ 不同。 AMQ 控制台显示消息已入队和出队,但它们没有发送给我的消费者。起初,我假设消息由于某种原因而出队进入 DLQ,但事实似乎并非如此。据我了解,除非有必要,AMQ 不会创建 DLQ。当我查看经纪人时,没有 DLQ。我怎样才能知道我的消息要去哪里?

由于反射,我在堆栈的应用程序端调试时也遇到问题。我想在 AMQ 端设置一个断点来查看我的消息发生了什么,但我不确定将其放在哪里。这里有什么想法吗?

这可能是序列化问题吗?我听说有时 JMS 代理之间的序列化差异可能会导致奇怪的行为。

我真的被困在这里,任何帮助将不胜感激。请参阅下面的配置信息。

野蝇8.2

AMQ 5.13

消费者(此处未显示消息)

public class PeriodicDerivationExecutionHandlerImpl implements PeriodicDerivationExecutionHandler {

protected DerivationService derivationService;
protected DerivationModelService derivationModelService;

protected Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void executeDerivation(PeriodicDerivation params) throws Exception{

JbpmHibernateUtil.openSession();
Derivation derivation = null;
try{

if (params.isGroup()){
derivation = new GroupDerivation();

GroupQueryParameters qp = new GroupQueryParameters();
qp.setGroupName(params.getItemName());
derivation.setDerivedItem(derivationModelService.getGroup(qp));

}else{
derivation = new DeterminantDerivation();

DeterminantQueryParameters qp = new DeterminantQueryParameters();
qp.setDeterminantName(params.getItemName());
derivation.setDerivedItem(derivationModelService.getDeterminant(qp));

}

logger.info("Executing periodic derivation [" + derivation + "]");

derivation.setModelEffectiveDate(new DateTime());
derivation.setPeriod(params.getPeriod());
derivation.getProcessParameters().add(new DerivationProcessParameter(PeriodicDerivation.PERIODIC_PROCESS_VAR, true));
derivation.setExecutionMode(DerivationExecutionMode.SYNCHRONOUS_LOCAL);
derivationService.executeDerivation(derivation);

JbpmHibernateUtil.closeSession(true);
}catch(Exception e){
logger.error("Periodic derivation execution failed for [" + derivation + "]",e);
JbpmHibernateUtil.closeSession(false);
throw new Exception("Periodic derivation execution failed for [" + derivation + "]",e);
}
}

public DerivationService getDerivationService() {
return derivationService;
}

public void setDerivationService(DerivationService derivationService) {
this.derivationService = derivationService;
}

public DerivationModelService getDerivationModelService() {
return derivationModelService;
}

public void setDerivationModelService(DerivationModelService derivationModelService) {
this.derivationModelService = derivationModelService;
}

}

消费者 XML 配置

<int:gateway id="periodicDerivationExecutionGateway"
service-interface="com.etse.jbpm.scheduler.PeriodicDerivationExecutionHandler">
<int:method name="executeDerivation" request-channel="periodicDerivationChannel" />
</int:gateway>

<bean id="periodicDerivationExecutor"
class="com.etse.jbpm.scheduler.PeriodicDerivationExecutionHandlerImpl">
<property name="derivationService" ref="derivationService" />
<property name="derivationModelService" ref="derivationModelService" />
</bean>

<int:service-activator input-channel="periodicDerivationChannel"
ref="periodicDerivationExecutor" method="executeDerivation" />

<int-jms:channel id="periodicDerivationChannel"
queue-name="${jms.destination.name.periodicderivation}" concurrency="${integration.listener.threads.maximum}"
task-executor="periodicDerivationTaskExecutor" />

ActiveMQStandalone.xml (Jboss)

<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
<resource-adapters>
<resource-adapter id="activemq-rar.rar">
<archive>
activemq-rar.rar
</archive>
<transaction-support>XATransaction</transaction-support>
<config-property name="ServerUrl">
tcp://127.0.0.1:61616?jms.rmIdFromConnectionId=true
</config-property>
<config-property name="UserName">
admin
</config-property>
<config-property name="Password">
admin
</config-property>
<connection-definitions>

<connection-definition
class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory"
jndi-name="java:/ConnectionFactory"
enabled="true"
pool-name="ConnectionFactory">
<xa-pool>
<min-pool-size>1</min-pool-size>
<max-pool-size>20</max-pool-size>
<prefill>false</prefill>
<is-same-rm-override>false</is-same-rm-override>
</xa-pool>
<recovery>
<recover-credential>
<user-name>admin</user-name>
<password>admin</password>
</recover-credential>
<recover-plugin class-name="org.jboss.jca.core.recovery.ConfigurableRecoveryPlugin">
<config-property name="EnableIsValid">
false
</config-property>
<config-property name="IsValidOverride">
true
</config-property>
<config-property name="EnableClose">
true
</config-property>
</recover-plugin>
</recovery>
</connection-definition>

</connection-definitions>

队列/主题

<admin-objects>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue"
jndi-name="java:jboss/exported/jms/queue/bpm/deferredBpmCommandQueue"
use-java-context="true"
pool-name="deferredBpmCommandQueue">
<config-property name="PhysicalName">
deferredBpmCommandQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue"
jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionRequestQueue"
use-java-context="true"
pool-name="ActiveMQQueue.asyncActionRequestQueue">
<config-property name="PhysicalName">
asyncActionRequestQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue"
jndi-name="java:jboss/exported/jms/queue/bpm/DLQ"
use-java-context="true"
pool-name="DLQ">
<config-property name="PhysicalName">
DLQ
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationQueue" use-java-context="true" pool-name="ActiveMQQueue.cacheUpdateReplicationQueue">
<config-property name="PhysicalName">
cacheUpdateReplicationQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/periodicDerivationQueue" use-java-context="true" pool-name="ActiveMQQueue.periodicDerivationQueue">
<config-property name="PhysicalName">
periodicDerivationQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncServiceSignalQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncServiceSignalQueue">
<config-property name="PhysicalName">
asyncServiceSignalQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/processEventTopic" use-java-context="true" pool-name="ActiveMQTopic.processEventTopic">
<config-property name="PhysicalName">
processEventTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionReplyQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncActionReplyQueue">
<config-property name="PhysicalName">
asyncActionReplyQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/ExpiryQueue" use-java-context="true" pool-name="ActiveMQQueue.ExpiryQueue">
<config-property name="PhysicalName">
ExpiryQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceStatusRequestTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionServiceStatusRequestTopic">
<config-property name="PhysicalName">
asyncActionServiceStatusRequestTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionAffinityRequestTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionAffinityRequestTopic">
<config-property name="PhysicalName">
asyncActionAffinityRequestTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/jbpmJobQueue" use-java-context="true" pool-name="ActiveMQQueue.jbpmJobQueue">
<config-property name="PhysicalName">
jbpmJobQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionAffinityReplyTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionAffinityReplyTopic">
<config-property name="PhysicalName">
asyncActionAffinityReplyTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationEventTopic" use-java-context="true" pool-name="ActiveMQTopic.cacheUpdateReplicationEventTopic">
<config-property name="PhysicalName">
cacheUpdateReplicationEventTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceStatusTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionServiceStatusTopic">
<config-property name="PhysicalName">
asyncActionServiceStatusTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceLogRecordQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncActionServiceLogRecordQueue">
<config-property name="PhysicalName">
asyncActionServiceLogRecordQueue
</config-property>
</admin-object>
</admin-objects>

经纪商配置

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>


<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker1" dataDirectory="${activemq.data}" persistent="true">

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >

<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>


<managementContext>
<managementContext createConnector="false"/>
</managementContext>


<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>


<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>

</broker>
<import resource="jetty.xml"/>

HornetQStandalone.xml (Jboss)

<subsystem xmlns="urn:jboss:domain:messaging:2.0">
<hornetq-server>
<persistence-enabled>false</persistence-enabled>
<jmx-management-enabled>true</jmx-management-enabled>
<shared-store>true</shared-store>
<journal-type>ASYNCIO</journal-type>
<journal-file-size>102400</journal-file-size>
<journal-min-files>2</journal-min-files>

<connectors>
<netty-connector name="netty" socket-binding="messaging"/>
<netty-connector name="netty-throughput" socket-binding="messaging-throughput">
<param key="batch-delay" value="50"/>
</netty-connector>
<in-vm-connector name="in-vm" server-id="0"/>
</connectors>

<acceptors>
<netty-acceptor name="netty" socket-binding="messaging"/>
<netty-acceptor name="netty-throughput" socket-binding="messaging-throughput">
<param key="batch-delay" value="50"/>
<param key="direct-deliver" value="false"/>
</netty-acceptor>
<in-vm-acceptor name="in-vm" server-id="0"/>
</acceptors>

<security-settings>
<security-setting match="#">
<permission type="send" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
</security-setting>
</security-settings>

<address-settings>
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>104857600</max-size-bytes>
<page-size-bytes>10485760</page-size-bytes>
<page-max-cache-size>10</page-max-cache-size>
<address-full-policy>PAGE</address-full-policy>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>

<jms-connection-factories>
<connection-factory name="InVmConnectionFactory">
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
<entry name="java:/ConnectionFactory"/>
</entries>
</connection-factory>
<connection-factory name="RemoteConnectionFactory">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>
</entries>
<client-failure-check-period>30000</client-failure-check-period>
<connection-ttl>300000</connection-ttl>
<retry-interval>2000</retry-interval>
<retry-interval-multiplier>1</retry-interval-multiplier>
<max-retry-interval>2000</max-retry-interval>
<reconnect-attempts>100</reconnect-attempts>
</connection-factory>
</jms-connection-factories>

队列/主题

<jms-destinations>
<jms-queue name="asyncActionRequestQueue">
<entry name="queue/bpm/asyncActionRequestQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/asyncActionRequestQueue"/>
</jms-queue>
<jms-queue name="asyncActionReplyQueue">
<entry name="queue/bpm/asyncActionReplyQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/asyncActionReplyQueue"/>
</jms-queue>
<jms-queue name="asyncServiceSignalQueue">
<entry name="queue/bpm/asyncServiceSignalQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/asyncServiceSignalQueue"/>
</jms-queue>
<jms-queue name="asyncActionServiceLogRecordQueue">
<entry name="queue/bpm/asyncActionServiceLogRecordQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/asyncActionServiceLogRecordQueue"/>
</jms-queue>
<jms-queue name="deferredBpmCommandQueue">
<entry name="queue/bpm/deferredBpmCommandQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/deferredBpmCommandQueue"/>
</jms-queue>
<jms-queue name="jbpmJobQueue">
<entry name="queue/bpm/jbpmJobQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/jbpmJobQueue"/>
</jms-queue>
<jms-queue name="DLQ">
<entry name="queue/DLQ"/>
<entry name="java:jboss/exported/jms/queue/DLQ"/>
</jms-queue>
<jms-queue name="ExpiryQueue">
<entry name="queue/ExpiryQueue"/>
<entry name="java:jboss/exported/jms/queue/ExpiryQueue"/>
</jms-queue>
<jms-queue name="periodicDerivationQueue">
<entry name="queue/bpm/periodicDerivationQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/periodicDerivationQueue"/>
</jms-queue>
<jms-queue name="cacheUpdateReplicationQueue">
<entry name="queue/bpm/cacheUpdateReplicationQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationQueue"/>
</jms-queue>
<jms-topic name="asyncActionServiceStatusTopic">
<entry name="topic/bpm/asyncActionServiceStatusTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/asyncActionServiceStatusTopic"/>
</jms-topic>
<jms-topic name="asyncActionServiceStatusRequestTopic">
<entry name="topic/bpm/asyncActionServiceStatusRequestTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/asyncActionServiceStatusRequestTopic"/>
</jms-topic>
<jms-topic name="asyncActionAffinityRequestTopic">
<entry name="topic/bpm/asyncActionAffinityRequestTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/asyncActionAffinityRequestTopic"/>
</jms-topic>
<jms-topic name="asyncActionAffinityReplyTopic">
<entry name="topic/bpm/asyncActionAffinityReplyTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/asyncActionAffinityReplyTopic"/>
</jms-topic>
<jms-topic name="processEventTopic">
<entry name="topic/bpm/processEventTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/processEventTopic"/>
</jms-topic>
<jms-topic name="cacheUpdateReplicationEventTopic">
<entry name="topic/bpm/cacheUpdateReplicationEventTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/cacheUpdateReplicationEventTopic"/>
</jms-topic>
</jms-destinations>

最佳答案

ObjectMessage 序列化安全性是问题所在。

ObjectMessage 对象依赖于 marshal/unmarshal 对象负载的 Java 序列化。此过程通常被认为是不安全的,因为恶意负载可以利用主机系统。这就是为什么从版本 5.12.2 和 5.13.0 开始,ActiveMQ 强制用户将可以使用 ObjectMessages 交换的包显式列入白名单。

我几天前看到了这个并添加了白名单,但它没有解决问题。我还尝试在 AMQ 5.11.3 上运行,但没有成功。显然他们也在 5.11.3 中添加了安全功能。无论如何,我将这个(-Dorg.apache.activemq.SERIALIZABLE_PACKAGES =“*”)添加到客户端和AMQ vm参数,现在一切都正常工作。

请记住,我使用的命令行选项是安全的我在我的代理中明确打开的漏洞,这可以允许恶意用户在我的系统上执行代码。正确的方法使用该标志是明确列出您允许的类反序列化,或者最多使用包通配符来避免显式列出受信任的父包中的各个类和子包。

关于java - ActiveMQ 消息出队但未使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35306494/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com