gpt4 book ai didi

java - Java HornetQ 客户端中的线程处理

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:25:30 24 4
gpt4 key购买 nike

我正在尝试了解如何处理连接到 HornetQ 的 Java 客户端中的线程。我没有收到特定错误,但无法理解我首先应该如何处理线程(关于 HornetQ 客户端,特别是 MessageHandler.onMessage() - 中的线程一般对我来说没问题)。

如果这是相关的:我正在使用 'org.hornetq:hornetq-server:2.4.7.Final' 来运行嵌入到我的应用程序中的服务器。我不打算改变现状。在我的情况下,从操作的角度来看,这比运行独立的服务器进程更方便。

到目前为止我做了什么:

  1. 创建一个嵌入式服务器:new EmbeddedHornetQ(),
    .setConfiguration()

  2. 创建服务器定位器:HornetQClient.createServerLocator(false, new TransportConfiguration(InVMConnectorFactory.class.getName()))

  3. 创建 session 工厂:serverLocator.createSessionFactory()

现在对我来说很明显,我可以使用 hornetqClientSessionFactory.createSession() 创建 session ,为该 session 创建生产者和消费者,并使用 在单个线程中处理消息.send().receive()

但我还发现了 consumer.setMessageHandler(),这告诉我我根本不了解客户端中的线程。我尝试使用它,但随后消费者在两个不同于创建 session 的线程中调用了 messageHandler.onMessage()。这似乎符合我看代码的印象——HornetQ 客户端使用线程池来分发消息。

这让我很困惑。 javadocs 说 session 是一个“单线程对象”,代码也同意——那里没有明显的同步。但是随着 onMessage() 在多个线程中被调用,message.acknowledge() 也在多个线程中被调用,而那个只是委托(delegate)给 session 。这应该如何运作? MessageHandler 不从多个线程访问 session 的场景会是什么样子?

更进一步,我如何从 onMessage() 中发送后续消息?我正在使用 HornetQ 作为持久的“待办事项”工作队列,因此发送后续消息是对我来说是一个典型的用例。但同样,在 onMessage() 中,我在错误的线程中访问 session 。

请注意,我可以远离 MessageHandler 并以允许我控制线程的方式使用 send()/receive()。但我确信我根本不了解整个情况,与多线程结合只是自找麻烦。

最佳答案

我可以回答你的部分问题,但我希望你现在已经解决了这个问题。

形成HornetQ documentation on ClientConsumer (强调我的):

A ClientConsumer receives messages from HornetQ queues.
Messages can be consumed synchronously by using the receive() methods which will block until a message is received (or a timeout expires) or asynchronously by setting a MessageHandler.
These 2 types of consumption are exclusive: a ClientConsumer with a MessageHandler set will throw HornetQException if its receive() methods are called.

因此在处理消息接收方面您有两种选择:

  1. 自己同步接收
    • 向HornetQ提供MessageListener
    • 在您自己的 cunsumer 线程中,在空闲时调用 .receive().receive(long itmeout)
    • 检索调用返回的(可选)ClientMessage 对象
      • Pro:使用 Session 您希望在 Consumer 中携带您可以转发您认为合适的消息
      • 缺点:所有这些消息处理都是顺序的
  2. 将线程同步委托(delegate)给 HornetQ
    • 不要在消费者上调用.receive()
    • 提供 onMessage(ClientMessage) 的 MessageListener 实现
      • Pro:所有消息处理都将是并发且快速、无忧的
      • 缺点:我认为不可能从这个对象中检索到 Session,因为它没有被接口(interface)公开。
    • 未经测试的解决方法:在我的应用程序(与您的应用程序一样在虚拟机中)中,我公开了底层的线程安全 QueueConnection作为应用程序范围内可用的静态变量。从您的 MessageListener 中,您可以在其上调用 QueueSession jmsSession = jmsConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 以获取新 session 并从中发送您的消息...这可能是 alright尽我所能see因为 Session 对象并没有真正重新创建。我这样做也是因为 session 有变得陈旧的趋势。

我认为您不应该非常希望控制您的消息执行线程,尤其是仅转发消息的临时线程。正如您猜到的那样,HornetQ 具有内置的线程池,并且可以有效地重用这些对象。

此外,正如您所知,您不需要在单个线程中访问对象(如队列),因此通过多个线程甚至通过多个 session 访问队列都没有关系。您只需确保一个 session 仅由一个线程访问,这是使用 MessageListener 设计的。

关于java - Java HornetQ 客户端中的线程处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37651952/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com