gpt4 book ai didi

java - Spring 集成 JMS 在入站和出站中发布订阅

转载 作者:行者123 更新时间:2023-12-01 12:02:28 25 4
gpt4 key购买 nike

我正在尝试使用 Spring-Integration JMS 的示例项目进行 JMS 集成,我已经成功完成了。然而我的要求略有不同。我需要使用发布订阅模式从 1 个 JMS 代理进行监听,并且需要将相同的监听 JMS 消息发送到另一个 Kafak 队列/或其他队列。我正在努力进行配置,到目前为止我只配置了请求和响应队列。这是配置。请帮忙。

Common.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd">

<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost"/>
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
</bean>

<!-- <bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue.demo"/>
</bean> -->

<bean id="replyQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue.reply"/>
</bean>
<bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue.request"/>
</bean>

<integration:poller id="poller" default="true" fixed-delay="100"/>

</beans>

InboudChanelAdapter

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration/stream
http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

<jms:message-driven-channel-adapter id="jmsIn"
destination="requestQueue"
channel="jmsInChannel" />

<channel id="jmsInChannel" />



<beans:beans profile="testCase">

<bridge input-channel="jmsInChannel" output-channel="queueChannel"/>

<channel id="queueChannel">
<queue />
</channel>

</beans:beans>

</beans:beans>

OutboundChannelAdapter.xml

    <?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration/stream
http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

<stream:stdin-channel-adapter id="stdin" channel="stdinToJmsoutChannel"/>

<channel id="stdinToJmsoutChannel"/>
<channel id="jmsInChannel" />

<jms:outbound-channel-adapter id="jmsout" channel="jmsInChannel" destination="requestQueue"/>

</beans:beans>
**DemoConfig.xml**

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd">

<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost"/>
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
</bean>


<bean id="connectionFactory2nd" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost"/>
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
</bean>



<bean id="replyQueue" class="org.apache.activemq.command.ActiveMQQueue" >
<constructor-arg value="queue.reply"/>
</bean>
<bean id="requestQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue.request"/>
</bean>

<jms:message-driven-channel-adapter id="jmsIn"
destination="requestQueue"
channel="jmsInChannel"
connection-factory="connectionFactory"/>

<jms:publish-subscribe-channel id= "jmsInChannel"/>
<jms:topic id="Topic"></jms:topic>
</<jms:channel>

<jms:outbound-channel-adapter id="jmsout" channel="jmsInChannel" destination="replyQueue" connection-factory="connectionFactory2nd"/>

<integration:poller id="poller" default="true" fixed-delay="100"/>

</beans>

最佳答案

看起来你的理论知识还不够,所以你应该去看看有关 Spring Integration 的文档和书籍。你感觉不舒服是什么MessageChannel还没有。

<jms:message-driven-channel-adapter id="jmsIn"
destination="requestQueue"
channel="jmsInChannel" />

含义:收听requestQueue目的地并将 Spring 集成消息发送到 jmsInChannel .

如果您只是将该消息发送到另一个 JMS 目的地,您应该执行如下操作:

<jms:outbound-channel-adapter id="jmsout" channel="jmsInChannel" destination="replyQueue"/>

并确保没有更多订阅者 jmsInChannel ,因为它是 DirectChannel .

根据您当前的配置,您有额外的订阅者 <bridge> 。在这种情况下 Round-Robin平衡器的作用是jmsInChannel第一条消息将发送给第一个订阅者,只有第二条消息将发送给第二个订阅者,依此类推。

如果您想接受两个订阅者的消息,您应该更改 jmsInChannel<publish-subscribe-channel> .

您可以从文档中找到更多信息。

关于java - Spring 集成 JMS 在入站和出站中发布订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27897941/

25 4 0
文章推荐: java - MainActivity.DownloadWebpageTask 不是抽象的,不会覆盖 AsyncTask 中的抽象方法 doInBackground(String...)
文章推荐: java - List 与 List 不同时的示例
文章推荐: Java 重命名不起作用