- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我使用 ActiveMQ 作为 JMS 代理和消费者,使用 jmsTemplate 发送消息,目前使用 1 个非持久主题。在高峰时间,我有大约 100 条消息/秒。
队列中有多少消息并不重要,但我经常收到重复的消息。我想出的临时解决方案是在表上设置索引——目前所有消息只保存在数据库中。
我的第一个问题 - 如果我指定了非持久主题并且不需要响应,为什么消息会重复?
发件人:
@Component
public class QueueSender
{
private Logger log = Logger.getLogger(getClass());
@Autowired
protected JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
@Autowired
public QueueSender( final JmsTemplate jmsTemplate )
{
this.jmsTemplate = jmsTemplate;
this.jmsTemplate.setDeliveryPersistent(false);
System.out.println("isSessionTransacted "+jmsTemplate.isSessionTransacted()+
" getDeliveryMode "+jmsTemplate.getDeliveryMode()+
" getReceiveTimeout "+jmsTemplate.getReceiveTimeout()+
" getSessionAcknowledgeMode "+jmsTemplate.getSessionAcknowledgeMode());
}
public void sendPrice(Integer tickerId, Integer field, Double price, Long timestamp)
{
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
jmsTemplate.setMessageIdEnabled(true);
Map <String, Object>map = new HashMap<String, Object>();
map.put("tickerId", tickerId);
map.put("field", field);
map.put("price", price);
map.put("timestamp", timestamp);
jmsTemplate.convertAndSend("Quotez", map);
}
public void sendVolume(Integer tickerId, Integer field, Integer size, Long timestamp)
{
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Map <String, Object>map = new HashMap<String, Object>();
map.put("tickerId", tickerId);
map.put("field", field);
map.put("size", size);
map.put("timestamp", timestamp);
jmsTemplate.convertAndSend("Quotez", map);
}
}
听众:
public void onMessage(Message message)
{
if (message instanceof MapMessage)
{
try
{
MapMessage mapMessage = (MapMessage) message;
if(null != mapMessage.getString("price"))
{
priceService.insert(mapMessage.getInt("tickerId"),mapMessage.getDouble("price"),
mapMessage.getInt("field"),mapMessage.getLong("timestamp"));
} else{
volumeService.insert(mapMessage.getInt("tickerId"),mapMessage.getInt("size"),
mapMessage.getInt("field"),mapMessage.getLong("timestamp"));
}
}
catch (final JMSException e)
{
exceptionListener.onException(e);
}
}
}
Spring :
<amq:broker useJmx="true" persistent="false">
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0"/>
</amq:transportConnectors> </amq:broker>
<amq:topic id="topicDest" physicalName="Quotez"/>
<amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost?jms.watchTopicAdvisories=false"/>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="jmsFactory" />
<property name="exceptionListener" ref="jmsExceptionListener" />
<property name="sessionCacheSize" value="100" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"/>
<property name="pubSubDomain" value="true"/>
<property name="defaultDestinationName" value="Quotez"/>
</bean>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="topicDest"/>
<property name="messageListener" ref="jdbcListener" />
</bean>
第二个问题是关于jmsContainer的配置。上面的代码和下面的代码有什么区别?上面的代码给我 Topic 作为订阅者,下面的代码给我 Queue。
<jms:listener-container concurrency="10" connection-factory="connectionFactory">
<jms:listener id="JdbcListener" destination="topicDest" ref="queueListener" />
</jms:listener-container>
我发现,Camel 及其幂等消费者应该可以解决重复问题——当然,如果首先知道它为什么会发生,那就太好了。第三个问题是关于 Camel 的配置。我有这个配置(默认):
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:0"/>
</bean>
<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<idempotentConsumer messageIdRepositoryRef="myRepo">
<header>messageId</header>
<to uri="mock:result"/>
</idempotentConsumer>
</route>
</camelContext>
它适用于所有队列还是我应该明确订阅?我想它会检查每个主题/队列和所有传入消息。目前的问题是,所有消息都有 messageId=null 并且过滤器将其作为参数。
2011-03-01 11:24:09,152 DEBUG (org.springframework.jms.core.JmsTemplate:567) - Sending created message: ActiveMQMapMessage {commandId = 0, responseRequired = false, **messageId = null**, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {field=1, timestamp=1298975049138, price=72.89, tickerId=2} }
我没有找到设置 messageId 的简单方法。我的问题 - 设置 messageId 是否足够并且它将按异常(exception)情况工作或配置有问题,例如我必须指定将使用哪个主题。
谢谢,
齐达斯
最佳答案
当使用 JMS 主题时,您需要将并发/最大并发消费者设置为“1”,否则会出现重复。如果您需要多线程消费和/或负载平衡,请使用 virtual topics相反。
关于java - ActiveMQ 上的重复消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5157136/
场景: private readonly IConnection connection; this.connection = connectionFactory.CreateConnection();
想知道是否可以在 activeMQ 服务器中完成任何配置,将消息从 DLQ 重定向到同一服务器上的另一个队列。 例如。 我有一个队列“MAINQUEUE”,其中有很多消息已激活客户端确认,在处理时如果
我正在 ActiveMQ 中的一个系统上工作,我真的不想丢失消息。我的问题是重试消息导致我的消费者阻塞(而不是处理他们可以处理的消息)。我想给失败的消息几天重试(例如,我的潜在目的地之一是我将通过 S
我们对 AMQ 使用以下配置 定期我有奇怪的问题 - 松散的消息。实际上AMQ说一切正常并且消
我已按照教程安装 ActiveMQ http://servicebus.blogspot.com/2011/02/installing-apache-active-mq-on-ubuntu.html
当我们使用 ActiveMQ 时,我们可以信任 ActiveMQ 服务器的可靠性。例如在开发非实时软件时(不需要立即发送数据。但应该发送)。我们能否信任 activeMQ 作为确认消息传递的可靠来源。
我遇到了 issue使用 ActiveMQ 并希望跟踪/查看所有 ActiveMQ 事件。我能找到的唯一日志文件是与持久数据相关联的(如果打开的话)。我是否查看或生成了任何其他日志文件来告诉我 Act
我们正面临 ActiveMQ 及其使用者的随机问题。我们观察到,即使连接到 ActiveMQ 队列,也很少有消费者没有收到消息。但是在消费者重启后它工作正常。 我们在 ActiveMQ 端有一个名为
有什么方法可以跟踪 ActiveMQ 中的延迟(计划)消息? 我在 AMQ 网络控制台中没有看到任何东西,它们似乎只有在延迟到期时才进入队列……而且我在 JMX 控制台中也找不到它,也许我搜索得不够好
我对Apache ActiveMQ的功能感到困惑。 我从this link下载了ActiveMQ 。所以我这样使用它(环境:Windows 7):我启动 bin/activemq.bat,然后它就可以
我们有一个 ActiveMQ 代理,它使用 JMS、AMQP 和 MQTT 连接到不同的客户端。由于某种原因,我们还没有弄清楚一组特定的 MQTT 客户端经常(并非总是)持久订阅。这是一个测试环境,其
在activemq中有什么方法可以获取消息的数量代理端每秒/每分钟消耗/产生的数量? 我已经尝试使用http://activemq.apache.org/jmeter-performance-test
如何在队列上的 ActiveMQ 中设置 redeliveryPolicy? 1) 在文档中,请参阅:activeMQ Redelivery ,说明您应该在 ConnectionFactory 或 C
我查了一下,它用于在两个系统之间发送消息。 但为什么?为什么不直接使用数据库? 一定有一些 ActiveMQ 具有 数据库 没有的功能吗? 最佳答案 它用于在两个分布式进程之间进行可靠的通信。 是的,
我在生产系统中运行 ActiveMQ。我们的一些队列的流量非常大,而有些队列的流量非常低。我对镜像其中一个低容量队列感兴趣,这样我就可以围绕接收到的消息构建非正式的监控服务。 不幸的是,the onl
我们已经使用此配置为 ActiveMQ Broker 配置了 Broker redelivery 插件。
有什么方法可以检查特定队列是否已存在于 ActiveMQ 中? 最佳答案 http://activemq.apache.org/how-can-i-get-a-list-of-the-topics-a
有人知道如何将 activemq-core.xsd url 与 jar 文件 (activemq-core-5.2.0.jar) 中的 activemq.xsd 关联起来? 我在互联网上找到了一些解决
我是 activemq 的新手。我试图使用 activemq 代理来订阅/发布消息。但至于缺乏经验,我不知道该怎么做,也不知道是否真的可以做到。我在谷歌上搜索了很多,但不幸的是,没有适合此类功能的示例
我一直在努力配置 ActiveMQ 代理,让我感到困惑的一件事是,我读过的所有内容都将 NIO 描述为“如果您需要扩展的不错选择”或“如果您需要更快的速度” ,所以我的问题是他们为什么不说“总是使用
我是一名优秀的程序员,十分优秀!