gpt4 book ai didi

jms - 使用 CoD over Camel JMS 组件实现原生 websphere MQ

转载 作者:行者123 更新时间:2023-12-04 08:23:32 28 4
gpt4 key购买 nike

我在使用 Apache CAMEL 实现 Websphere MQ (WMQ) 连接器时遇到了很多困难,该连接器可以毫无异常(exception)地处理 MQ 交付确认 (CoD) 报告,也没有出现不需要的响应数据报形式的副作用。最后,我让它按照我想要的方式工作,如果您习惯于编写 native MQ 客户端,这是一种非常标准和常见的方式。我在同一主题的帖子中记录了该方法,但我发现该解决方案因复杂而臃肿,并且非常感谢任何建议或示例,以使实现更清晰、更优雅。

我明白问题的根源在于 MQ 设计请求-回复消息交换模式 (MEP) 的方式、JMS 规范的方式以及 JMS 组件中请求-回复 MEP 的 CAMEL 实现。三种不同的哲学!

  • WMQ 具有 MessageType header (请参阅 MQMD fieldsconstants ),其中包含值 1 表示请求、2 表示回复和 8 表示数据报(单向 MEP)。此外,值 4 用于标记 CoD(Conf. of Delivery)、PAN(Positive AckNowledge)和 NAN(Negative AckNowledge)形式的 Report 消息,这在消息流方面也进行了额外的 Reply信息。可以使用名为“报告”的另一个标题字段为请求消息、回复或数据报请求 CoD、PAN 和 NAN 确认,其中可以组合所有报告变体的标志。附加头字段“ReplyToQ”和“ReplyToQMgr”指定原始发件人期望报告和回复所在的队列和队列管理器,固定的 24 字节“CorrelId”字段 - 可选 - 可以帮助将报告和回复与原始数据报相关联或请求消息。为了使它更复杂,确实可以用相同的原始消息 ID 和没有 CorrelID 发送回回复和报告,或者在 CorrelId 中提供原始消息 ID,或者在原始请求或数据报中已经指定时返回 CorrelId 值。 IBM 提供了 JMS API over WMQ ,允许通过 WMQ 作为传输隧道传输普通 JMS 交换(在额外消息头名称 MQRFH2 的帮助下),或到 map native MQ messages onto JMS messages反之亦然。
  • 另一方面,JMS 规范提供了一个可选的“JMSReplyTo” header 字段和一个“JMSCorrelationID”,但将确切的 MEP 语义留给了客户端应用程序;即在规范中说明:“响应可能是可选的;由客户决定。”
  • CAMEL 具有 XML 或 Java DSL 中的“路由”和内部交换对象模型,旨在支持 EIP Patterns其中Request-Reply图案。 CAMEL 然后在其 JMS Component 中假设如果设置了 JMSReplyTo 字段,这必然是一个需要回复的请求,导致 Exchange 的 Out 部分(或修改后的 In 部分,如果 Out 为空)返回到 JMSReplyTo 中定义的队列。

  • 我愿意通过远程 Websphere 队列管理器支持带有确认传递 (CoD) 报告的本地 MQ 消息交换,这样,除了事务和持久性(即没有丢失,没有重复)之外,还可以跟踪消息何时被消费并在出现延误时发出警报。

    入站问题:

    默认情况下,当来自队列的消息消耗完成时,Websphere 队列管理器生成 CoD 报告。因此, 没有任何特定设置 ,当 CAMEL 端点使用消息时,远程 MQ 客户端发送带有 CoD 标志的数据报(然后是强制的 ReplyToQ)将从队列管理器返回作为 MQ 报告的第一个回复,然后是第二个(意外的)明确回复消息由 CAMEL 返回并包含在 CAMEL 路由结束时 Exchange 对象中剩余的任何内容,因为 CAMEL 假定存在请求-回复 EIP,因为 JMSReplyTo 字段存在(从 MQ ReplyToQ 和 ReplyToQMgr 映射,两者都需要支持 CoD回流)。

    出境问题:

    在没有特定设置的情况下,CAMEL 也默认假设出站连接上的请求-回复 EIP/MEP。 CAMEL JMS/MQ 端点然后将等待 1 回复。当出站消息是 MQ 上的 JMS(因此带有 MQRFH2 header )时,这可以正常工作。当强制使用普通 MQ 时,即删除如下 MQRFH2 header 时,我无法让端点监听器匹配相关的传入 MQ 报告,尽管跟踪值似乎都是正确的(强制使用 24 个字符相关 ID,以便截断较长的 CorrelId 值或空填充by MQ 无法对相关过滤器进行地理分析)。有没有人能够解决这个问题?

    详情:尽管 IBM JMS API 接受传递特定的 JMS 属性值 WMQ_MESSAGE_BODY={1|0}/WMQ_TARGET_CLIENT={1|0} 来控制 JMS header MQRFH2 在生成的消息中的存在,但这些选项通过 CAMEL 变得无效。必须使用 CamelJmsDestinationName header ( as explained in CAMEL JMS doc ) 为目标提供一个 IBM 队列 URL,其中包含选项“targetClient=1”,以便去除 MQRFH2 header 。但是如果没有这个头,CoD 报告或 MQ 回复上的 CAMEL 关联就会失败。

    上述问题的解决方案确实是关于构建特定的 CAMEL 路由来处理远程方返回的 CoD 报告(以及相关的 MQ 回复)。因此,CAMEL 出站消息必须强制为“InOnly” ExchangePattern因此无需等待任何回复。但这会导致 CAMEL 抑制所有 ReplyTo 字段。然后,如果在出站 MQ 数据报上请求 MQ CoD,则会出现 CAMEL 异常,其原因是 MQException JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2027' ('MQRC_MISSING_REPLY_TO_Q') .

    CAMEL 记录了一个 URI 选项 'disableReplyTo=true' 以禁用交换模式的回复监听,但保留 ReplyTo 字段 - 显然在入站和出站交换中。但是这个选项在出站 JMS 交换上不起作用(正如观察到的,我可能是错的),必须使用不太直观的“preserveMessageQos”选项。

    欢迎对这些问题提出优雅的解决方案。

    最佳答案

    我所能得到的最好的记录如下,以 Spring XML 应用程序上下文为例,它本身承载着 CAMEL 上下文和路由。它与 IBM native MQ JCA 兼容资源适配器 v7.5、CAMEL 2.15、Spring core 4.2 配合使用。我可以将它部署到 Glassfish 和 Weblogic 服务器。

    当然,考虑到众多变量,在实际实现中使用了 java DSL。这个基于 CAMEL XML DSL 的示例是独立的并且易于测试。

    我们从 Spring 和 Camel 声明开始:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">

    CAMEL 上下文遵循 2 条路线:MQ 到 JMS 和 JMS 到 MQ,此处链接以形成桥梁以简化测试。
    <camel:camelContext id="mqBridgeCtxt">

    <camel:route id="mq2jms" autoStartup="true">

    奇怪:当使用 native MQ 资源适配器时,获得(例如)3 个监听器的唯一方法是强制执行 3 个连接(按顺序使用 3 个 Camel:from 语句),每个连接最多 1 个 session ,否则会出现 MQ 错误: MQJCA1018: Only one session per connection is allowed .但是,如果您改用 MQ 客户端 jar,则 CAMEL JMS 中的 concurentConsumers 选项确实可以正常工作。
        <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
    acknowledgementModeName=SESSION_TRANSACTED"/>
    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
    acknowledgementModeName=SESSION_TRANSACTED"/>
    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
    acknowledgementModeName=SESSION_TRANSACTED"/>

    上面的 disable disableReplyTo 选项可确保 CAMEL 在我们测试 MQ 消息类型为 1=Request(-reply) 或 8=datagram(一种方式!)之前不会产生回复。此处未说明该测试和回复结构。

    然后我们在下一次发布到普通 JMS 时将 EIP 强制为 InOnly,以与 Inbound MQ 模式保持一致。
        <camel:setExchangePattern pattern="InOnly"/>
    <!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
    <camel:to uri="ref:innerQueue" />
    </camel:route>

    接下来是 jms-to-MQ 路由:
      <camel:route id="jms2mq"  autoStartup="true">
    <camel:from uri="ref:innerQueue" />
    <!-- remove inner message headers and properties to test without inbound side effects! -->
    <camel:removeHeaders pattern="*"/>
    <camel:removeProperties pattern="*" />
    <!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->

    现在是远程目标返回的 MQ CoD 报告的请求标志。我们还强制 MQ 消息为数据报类型(值 8)。
        <camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
    <camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
    <camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>

    ReplyTo 队列可以通过 ReplyTo uri 选项指定,也可以作为标题指定,如下所示。

    接下来,我们确实使用 CamelJmsDestinationName 头来强制抑制 JMS MQ 消息头 MQRFH2(使用 targetClient MQ URL 选项值 1)。换句话说,我们想要发送一个普通的 MQ 二进制消息(即只有 MQMD 消息描述符后跟有效负载)。
        <camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
    <camel:setHeader headerName="CamelJmsDestinationName"><camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>

    更多MQMD字段可以通过 reserved JMS properties控制如下图所示。见 restrictions在 IBM 文档中。
        <camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR   </camel:constant></camel:setHeader>
    <camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>

    URI 中的目标队列被上面的 CamelJmsDestinationName 覆盖,因此 URI 中的队列名称成为占位符。

    URI 选项 preserveMessageQos 是一个 - 正如观察到的 - 允许发送带有正在设置的 ReplyTo 数据的消息(以获取 MQ CoD 报告),但防止 CAMEL 通过强制执行 InOnly MEP 来实例化 Reply 消息监听器。
        <camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&amp;
    exchangePattern=InOnly&amp;preserveMessageQos=true&amp;
    includeSentJMSMessageID=true" />
    </camel:route>
    </camel:camelContext>

    以下内容必须根据您的上下文进行调整。它为本地 JMS 提供程序和 Websphere MQ(通过本地 IBM WMQ JCA 资源适配器)提供队列工厂。我们在这里对管理对象使用 JNDI 查找。
    <camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
    </camel:endpoint>

    <jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
    <jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>

    <bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="jmsraQCFBean" />
    </bean>

    <bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="mqQCFBean" />
    </bean>

    </beans>

    或者,如果您使用 MQ 客户端 jars 而不是资源适配器,您将像这样声明连接工厂 bean(代替上面的 JNDI 查找):
    <bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
    <property name="hostName" value="${mqHost}"/>
    <property name="port" value="${mqPort}"/>
    <property name="queueManager" value="${mqQueueManager}"/>
    <property name="channel" value="${mqChannel}"/>
    <property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
    <property name="appName" value="${connectionName}"/>
    </bean>

    <bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="mqCFBean"/>
    <property name="transacted" value="true"/>
    <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
    </bean>

    欢迎评论和改进。

    关于jms - 使用 CoD over Camel JMS 组件实现原生 websphere MQ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33066088/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com