- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我无法让 OpenNMS 向 AMQP 端点发送消息。我从来没有这样做过,这是我第一次使用 OpenNMS 和 AMQP,所以这可能是我经验不足的原因。
我已经配置了 RabbitMQ 3.5.7 并按照此 question 对其进行了测试。使用外部 QPID 0.32 客户端时它可以正常工作,并且使用 python 或 perl 时也可以正常工作。正常工作的定义是建立通信并将消息有效负载传输到交换器中,然后传递到后端队列中。然后可以在 RabbitMQ 管理 GUI 中查看消息。
在 OpenNMS 中,我一直遵循指示 here从 EventForwarder 开始,然后尝试 AlarmNorthbounder 都产生 NullPointerException。
我按如下方式设置 Karaf,使用这些语句设置属性:-
opennms> config:edit org.opennms.features.amqp.alarmnorthbounder
opennms> propset connectionUrl amqp://simon:simon@/test?brokerlist=\'localhost:5672\'
opennms> propset destination "amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }"
opennms> propset processorName default-alarm-northbounder-processor
opennms> config:update
opennms> config:list '(service.pid=org.opennms.features.amqp.alarmnorthbounder)'
----------------------------------------------------------------
Pid: org.opennms.features.amqp.alarmnorthbounder
BundleLocation: mvn:org.opennms.features.amqp/org.opennms.features.amqp.alarm-northbounder/18.0.0
Properties:
connectionUrl = amqp://simon:simon@/test?brokerlist='localhost:5672'
destination = amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }
felix.fileinstall.filename = file:/usr/share/opennms/etc/org.opennms.features.amqp.alarmnorthbounder.cfg
processorName = default-alarm-northbounder-processor
service.pid = org.opennms.features.amqp.alarmnorthbounder
我在日志中收到以下错误消息
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[forwardAlarm ] [forwardAlarm ] [seda://forwardAlarm ] [ 8]
[forwardAlarm ] [convertBodyTo3 ] [convertBodyTo[org.opennms.netmgt.alarmd.api.NorthboundAlarm] ] [ 0]
[forwardAlarm ] [log3 ] [log ] [ 1]
[forwardAlarm ] [bean3 ] [bean[ref:dynamicallyTrackedProcessor] ] [ 0]
[forwardAlarm ] [to3 ] [amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} } ] [ 7]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-ubuntu-1604-35241-1465752556843-2-60
ExchangePattern InOnly
Headers {breadcrumbId=ID-ubuntu-1604-35241-1465752556843-2-58, CamelRedelivered=false, CamelRedeliveryCounter=0}
BodyType String
Body NorthboundAlarm[id=3, uei='uei.opennms.org/generic/traps/EnterpriseDefault', nodeId=1]
]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.NullPointerException
at org.apache.qpid.client.BasicMessageProducer_0_8.declareDestination(BasicMessageProducer_0_8.java:63)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
at org.apache.qpid.client.BasicMessageProducer.<init>(BasicMessageProducer.java:136)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
at org.apache.qpid.client.BasicMessageProducer_0_8.<init>(BasicMessageProducer_0_8.java:55)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:559)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:62)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
我的期望是它应该能够使用与我在外部使用的库相同的库将消息传递到队列。
对 org.apache.qpid 进行 DEBUG,我得到:-
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [org.apache.qpid.client.state.StateWaiter@5d7e5d44]
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: FieldTable::writeToBuffer: Writing encoded length of 254...
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: {instance=[LONG_STRING: ubuntu-16041465757105204], product=[LONG_STRING: qpid], version=[LONG_STRING: 0.28], platform=[LONG_STRING: Java(TM) SE Runtime Environment, 1.8.0_45-b14, Oracle Corporation, amd64, Linux, 4.4.0-22-generic, unknown], qpid.client_process=[LONG_STRING: Qpid Java Client], qpid.client_pid=[INT: 1318]}
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionTuneBodyImpl: channelMax=0, frameMax=131072, heartbeat=60]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.handler.ConnectionTuneMethodHandler: ConnectionTune frame received
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 3 name: CONNECTION_NOT_OPENED from old state AMQState: id = 2 name: CONNECTION_NOT_TUNED
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [org.apache.qpid.client.state.StateWaiter@5d7e5d44]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionOpenOkBodyImpl: knownHosts=null]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 4 name: CONNECTION_OPEN from old state AMQState: id = 3 name: CONNECTION_NOT_OPENED
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [org.apache.qpid.client.state.StateWaiter@5d7e5d44]
2016-06-12 19:00:38,726 INFO org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connection 44 now connected from /127.0.0.1:45388 to localhost/127.0.0.1:5672
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Are we connected:true
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connected with ProtocolHandler Version:0-91
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnectionDelegate_8_0: Write channel open frame for channel id 1
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Created session:org.apache.qpid.client.AMQSession_0_8@179483f0
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ChannelOpenOkBodyImpl: channelId=null]
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [BasicQosOkBodyImpl: ]
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQDestination: Based on onms3/Simon;{'create':'always','node':{'type':'topic'} } the selected destination syntax is ADDR
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Closing session: org.apache.qpid.client.AMQSession_0_8@179483f0
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.protocol.AMQProtocolSession: closeSession called on protocol session for session 1
它会在写入任何信息之前关闭 session 。
当我从外部 qpid 客户端执行基本相同的操作时 - 执行如下:-
#!/bin/bash
rm log.out
java -Dqpid.amqp.version=0-91 -Dlog4j.debug -Dlog4j.configuration=file:./log4j.properties -cp "client/example/target/classes/:client/example/target/dependency/*:slf4j-1.7.21/slf4j-log4j12-1.7.
21.jar:apache-log4j-1.2.17/log4j-1.2.17.jar" \
org.apache.qpid.example.ListSender
我明白了:-
142 [main] DEBUG org.apache.qpid.client.AMQConnection - Are we connected:true
142 [main] DEBUG org.apache.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-91
146 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - Write channel open frame for channel id 1
162 [main] DEBUG org.apache.qpid.client.AMQSession - Created session:org.apache.qpid.client.AMQSession_0_8@6bdf28bb
164 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ChannelOpenOkBody]
165 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [BasicQosOkBodyImpl: ]
173 [main] DEBUG org.apache.qpid.client.AMQDestination - Based on onms3/Simon;{create: always, node:{type: topic } } the selected destination syntax is ADDR
177 [main] DEBUG org.apache.qpid.framing.FieldTable - FieldTable::writeToBuffer: Writing encoded length of 0...
178 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
179 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
179 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
180 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - MessageProducer org.apache.qpid.client.BasicMessageProducer_0_8@1936f0f5 using publish mode : ASYNC_PUBLISH_ALL
190 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content body frames to 'onms3'/'Simon'; {
'create': 'always',
'node': {
'type': 'topic'
}
}
190 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content header frame to 'onms3'/'Simon'; {
'create': 'always',
'node': {
'type': 'topic'
}
}
190 [main] DEBUG org.apache.qpid.framing.FieldTable - FieldTable::writeToBuffer: Writing encoded length of 90...
191 [main] DEBUG org.apache.qpid.framing.FieldTable - {Id=[INT: 987654321], name=[LONG_STRING: WidgetSimon], price=[DOUBLE: 0.99], qpid.subject=[LONG_STRING: Simon], JMS_QPID_DESTTYPE=[INT: 2]}
192 [main] DEBUG org.apache.qpid.client.AMQSession - Closing session: org.apache.qpid.client.AMQSession_0_8@6bdf28bb
192 [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolSession - closeSession called on protocol session for session 1
194 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ChannelCloseOkBody]
194 [IoReceiver - localhost/127.0.0.1:5672] INFO org.apache.qpid.client.handler.ChannelCloseOkMethodHandler - Received channel-close-ok for channel-id 1
195 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ConnectionCloseOkBody]
196 [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - Session closed called by client
ListSender.java如下:-
package org.apache.qpid.example;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.ListMessage;
public class ListSender {
public static void main(String[] args) throws Exception
{
Connection connection =
new AMQConnection("amqp://simon:simon@localhost/test?brokerlist='tcp://localhost:5672'");
AMQShortString a1 = new AMQShortString("");
AMQShortString a2 = new AMQShortString("");
AMQShortString[] bindvars = new AMQShortString[]{a1,a2};
boolean is_durable = true;
/*
Destination queue = new AMQAnyDestination( new AMQShortString("onms2"),
new AMQShortString("direct"),
new AMQShortString("Simon"),
true,
true,
new AMQShortString(""),
false,
bindvars);
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Destination queue = new AMQAnyDestination("onms3/Simon");
Destination queue = new AMQAnyDestination("onms3/Simon;{create: always, node:{type: topic } }");
//Destination queue = new AMQAnyDestination("onms3/Simon;%7Bcreate%3A%20always%2C%20node%3A%7Btype%3A%20topic%20%7D%20%7D");
//Destination queue = new AMQAnyDestination("amqp:OpenNMSExchange/Taylor; {create: always}");
//Destination queue = new AMQAnyDestination("OpenNMSExchange; {create: always}");
MessageProducer producer = session.createProducer(queue);
ListMessage m = ((org.apache.qpid.jms.Session)session).createListMessage();
m.setIntProperty("Id", 987654321);
m.setStringProperty("name", "WidgetSimon");
m.setDoubleProperty("price", 0.99);
List<String> colors = new ArrayList<String>();
colors.add("red");
colors.add("green");
colors.add("white");
m.add(colors);
Map<String,Double> dimensions = new HashMap<String,Double>();
dimensions.put("length",10.2);
dimensions.put("width",5.1);
dimensions.put("depth",2.0);
m.add(dimensions);
List<List<Integer>> parts = new ArrayList<List<Integer>>();
parts.add(Arrays.asList(new Integer[] {1,2,5}));
parts.add(Arrays.asList(new Integer[] {8,2,5}));
m.add(parts);
Map<String,Object> specs = new HashMap<String,Object>();
specs.put("colours", colors);
specs.put("dimensions", dimensions);
specs.put("parts", parts);
m.add(specs);
producer.send((Message)m);
System.out.println("Sent: " + m);
connection.close();
}
}
我的假设是这是 AMQP 服务器的某种连接问题。在解决此问题时遇到了许多问题,如果我遇到来自外部 jar 的问题,那么从日志中可以清楚地看出问题是什么。这是 OpenNMS 的问题吗?有人成功完成此操作吗?有什么想法吗?
干杯
西蒙
最佳答案
根据 OpenNMS 列表的指导,我决定安装 QPID Broker v6.0.3 来代替 RabbitMQ,并且消息传输现在没有问题。
关于java - OpenNMS v18 AMQP 消息发送问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37778136/
在 Spring AMQP 项目中, 如果 messageProperties 没有 messageId,它们总是创建 messageId。 像这样.. if (this.createMessageI
我不确定我对 errorHandler 和 returnExceptions 的理解是否正确。 但这是我的目标:我从 App_A 发送了一条消息,使用 @RabbitListener 在 App_B
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我们希望将大型机放在总线上。我相信是AS400。为此,我想让 CICS 大型机向代理发送 AMQP 消息。有几十个AMQP客户端,包括JMS客户端。我对大型机上可能发生的事情了解不够,无法判断是否可以
我们希望将大型机放在公共(public)汽车上。我相信它是AS400。为此,我想让 CICS 大型机向代理发送 AMQP 消息。有几十个 AMQP 客户端,包括 JMS 客户端。我不太了解大型机上的可
我即将实现一个基于 PHP 的系统,该系统使用 RabbitMQ。我可以看出那里有 2 个成熟的库:PECL AMQP和 php-amqp . 我将同时为客户端和工作人员使用 PHP。 有人对这两个库
我正在使用 RabbitMQ 和 ruby-amqp与 Rails。当 Controller 收到消息时,我执行以下操作: def create AMQP.start("amqp://localh
我有两个困惑。 1.如果消息监听器抛出RuntimeException,SimpleMessageListenrContainer会停止吗?2.如果SimpleMessageListenerConta
我们在绑定(bind)到 Rabbit MQ 实例的 Java 应用程序的日志中遇到以下异常。 这是必须要注意的事情,表示Spring AMQP的实现中存在问题,还是可以忽略的事情?在后一种情况下,此
场景:微服务从 RabbitMQ 队列中获取消息,将其转换为对象,然后微服务对外部服务进行 REST 调用。 它将处理成千上万条这样的消息,如果我们知道外部 Rest 服务已关闭,有没有办法告诉我的消
我有一个使用带有 webflux 的 Boot 2.0 的应用程序,并且有一个端点返回 ServerSentEvent 的 Flux。这些事件是通过利用 spring-amqp 从 RabbitMQ
我正在尝试使用 streadway/amqp 连接到 RabbitMQ 总线Go 的驱动程序。我正在处理重新连接例程,为此,我有一个 rabbitMQConsume 函数调用 rabbitMQConn
我正在尝试通过 SSL 连接到 RabbitMQ。我遵循了此处链接的 RabbitMQ SSL 文档 https://www.rabbitmq.com/ssl.html根据 RabbitMQ SSL
这些 amqp 客户端库之间有什么区别?哪一个是最推荐的?主要区别是什么? 最佳答案 我会推荐 amqp.node和 bramqp通过 node-amqp。 node-amqp 有很多错误并且维护不善
我得到的错误: 2019-12-09 06:39:33.189 ERROR 107132 --- [http-nio-8082-exec-5] o.a.c.c.C.[.[.[/].[dispatche
所以我已经让 MQTT -> MQTT 和 AMQP -> AMQP 工作;不过,MQTT -> AMQP 的翻译似乎在某处不起作用。这是我的测试,如果我的“监听器”也在使用 paho 的 MQTT
我得到的错误: 2019-12-09 06:39:33.189 ERROR 107132 --- [http-nio-8082-exec-5] o.a.c.c.C.[.[.[/].[dispatche
几天前我问了同样的问题:Unable to "Peek" messages from an Azure Service Bus Queue using AMQP and Node 。我再次问同样的问题
我有一个当前与 RabbitMQ 集成的组件。我想将 RabbitMQ 替换为 Azure Event Hub,因为我们现在位于云中。 AMQP 0.9.1 与 AMQP 1.0 兼容吗?交换会无缝进
我有一个当前与 RabbitMQ 集成的组件。我想将 RabbitMQ 替换为 Azure Event Hub,因为我们现在位于云中。 AMQP 0.9.1 与 AMQP 1.0 兼容吗?交换会无缝进
我是一名优秀的程序员,十分优秀!