- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
ActiveMQ 我用来在主题 (topic
) 和队列 (queue
) 中发送消息。
我有两个问题:
删除
(取消操作)发送到队列或队列的消息话题。 删除
和清除
所有队列/主题。使用通过 STOMP 协议(protocol)组织的 AMQ stompy库,但是没有合适的functions
告诉我应该使用哪些库或解决方案本身。
非常感谢。
最佳答案
如何删除我知道的消息,但只是理论上的(通过 WireShark 分析包流量的结果,AMQ 管理页面 ActiveMQ 中的 localhost:8161\admin 页面中的工作浏览器),并且我无法以编程方式删除消息(Python)。
理论上我可以在AMQ中使用参数调用(我在删除时发送到\admin AMQ的packege中看到这个)deleteMessage()
。[id,secret]
,哪里
id
- 队列\主题中消息的唯一名称secret
- 唯一的数字(可能是一些“ token ”),每次更新(例如 F5)\admin\browse 页面时都会发生变化。我不能说它是什么......请参阅此处的图片:https://ru.stackoverflow.com/q/618697/228254在我的回答下面的帖子中。
示例测试队列:
此时,我的想法是:获取队列中的所有消息并确认我需要丢弃\从队列中删除的消息。
这是我执行此操作的简单客户端代码:
from stompy import stomp
import json
s = stomp.Stomp(amq_ip, amq_port)
try:
s.connect(username=amq_user, password=amq_pass)
s.subscribe({'destination': '%s' % amq_queue, 'ack': 'client'})
except Exception as e:
print "ActiveMQ error\n %s" % e
while True:
try:
frame = s.receive_frame()
body = json.loads(frame.body)
# это сообщение для меня?
if body["interested_atr_in_msg"] == "interested_value_of_attr_in_msg":
print "Its for me. I receive it"
# Это сообщение для меня. Я его приму и обработаю
s.ack(frame)
else:
# Это сообщение предназначено для кого-то другого и мне не подходит
print "Its not for me"
except Exception as e:
print e
# -*- coding: utf-8 -*-
import activemq_api
import urllib3
import json
# Connection to ActiveMQ
BROKER_NAME = "localhost"
AMQ_API_PORT = 8161
AMQ_API_USER = "admin"
AMQ_API_PASS = "admin"
AMQ_API_POSTFIX = "/api/jolokia"
AMQ_TASK_QUEUE_NAME = "test"
BASIC_AUTH ='%s:%s' % (AMQ_API_USER, AMQ_API_PASS)
AMQ_STATUS_QUEUE = "/queue/test"
LOGIN_EXEMPT_URLS = [
r'admin/'
]
LOGIN_URL = 'url_login'
LOGOUT_REDIRECT_URL = 'url_login'
if __name__ == '__main__':
user_agent = "curl/7.49.1"
headers = urllib3.util.make_headers(basic_auth=BASIC_AUTH, user_agent=user_agent)
addition = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "*/*"
}
try:
headers.update(addition)
connect = activemq_api.Connection(AMQ_IP, AMQ_API_PORT, BROKER_NAME, headers, AMQ_API_POSTFIX)
manager = activemq_api.AMQManager(connect)
except Exception as e:
print(u'%s: Превышено число максимальных попыток соединения к ActiveMQ' % e.__class__.__name__)
else:
print(u'Соединение успешно установлено')
try:
id="ID:№№№№№№№№№№№№№№№№№№-54825-1482598606528-3:586:-1:1:1"
secret="wertrtd-3fdf-4dfd-gr56-dfghdvhshtdfgdw"
print(manager.removeMsgQueue("test", id))
except Exception as inst:
print inst
#!/usr/bin/python2
# -*- coding: utf-8 -*-
import urllib3
import json
class Connection:
def __init__(self, amq_ip, amq_port, broker, header, postfix):
self.BROKER_NAME = broker
self.AMQ_IP = amq_ip
self.AMQ_PORT = amq_port
self.HEADERS = header
self.POSTFIX = postfix
class AMQManager():
def __init__(self, conn):
self.QUEUES = {}
self.QUEUES_COUNT = None
self.HEAP_MEMORY_USED = None
self.MEMORY_PERSENT_USED = None
self.CONNECTION = conn
self.update()
def rmQueue(self, queue_names):
REUQEST = {
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"operation": "removeQueue(java.lang.String)",
"arguments": [queue_names]
}
return json.dumps(REUQEST)
def queueList(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"attribute":"Queues"
}
return json.dumps(REUQEST)
def browseQueueSubscribers(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"attribute": "QueueSubscribers"
}
return json.dumps(REUQEST)
def memoryPersentUsed(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"attribute": "MemoryPercentUsage"
}
return json.dumps(REUQEST)
def heapMemoryUsed(self):
REUQEST = {
"type": "read",
"mbean": "java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"path":"used"
}
return json.dumps(REUQEST)
def request(self, name, param):
http = urllib3.PoolManager()
body = ''
if name == "removeQueue":
body = self.rmQueue(param["QUEUE_NAME"])
elif name == "queueList":
body = self.queueList()
elif name == "browseQueueSubscribers":
body = self.browseQueueSubscribers()
elif name == "memoryPersentUsed":
body = self.memoryPersentUsed()
elif name == "heapMemoryUsed":
body = self.heapMemoryUsed()
url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
return r.data
def updateQueues(self):
res = json.loads(self.request("queueList", {}))
# print res
data = []
for queue in res["value"]:
object = {}
queue["objectName"] = queue["objectName"].split(":")[1]
for key in queue["objectName"].split(","):
object.update({key.split("=")[0]: key.split("=")[1]})
data.append(object)
self.QUEUES_COUNT = 0
self.QUEUES = {}
# print data
for queue in data:
self.QUEUES.update({queue["destinationName"]: Queue(queue["destinationName"], self.CONNECTION)})
self.QUEUES_COUNT += 1
def updateHeapMem(self):
self.HEAP_MEMORY_USED = json.loads(self.request("heapMemoryUsed", {}))["value"]
def updatePersMem(self):
self.MEMORY_PERSENT_USED = json.loads(self.request("memoryPersentUsed", {}))["value"]
Ars, [26.01.17 14:06]
## EXPORTABLE
def update(self):
self.updateQueues()
self.updateHeapMem()
self.updatePersMem()
## EXPORTABLE
def getQueues(self):
self.updateQueues()
data = []
for queue in self.QUEUES:
data.append(self.QUEUES[queue].getInfo())
return {
"queues_count": self.QUEUES_COUNT,
"queues": data
}
## EXPORTABLE
def getQueueInfo(self, name):
return self.QUEUES[name].getInfo()
## EXPORTABLE
def browseQueue(self, name):
return self.QUEUES[name].browse()
## EXPORTABLE
def getMessage(self, name, msg_id):
return self.QUEUES[name].message(msg_id)
def getAllQueueMessages(self, name):
return self.QUEUES[name].messages()
## EXPORTABLE
def removeQueue(self, name):
param = {
"QUEUE_NAME": name
}
return json.loads(self.request("removeQueue", param))
## EXPORTABLE
def clearQueue(self, name):
return self.QUEUES[name].clear()
# ARS
def removeMsgQueue(self,nameQueue, id):
return self.QUEUES[nameQueue].delete_msg(id)
class Queue():
def __init__(self, q_name, conn):
# научите обращаться к атрибутам суперкласса!
self.MESSAGES = []
self.QUEUE_NAME = q_name
self.ENQUEUE_COUNT = None
self.DEQUEUE_COUNT = None
self.CONSUMER_COUNT = None
self.QUEUE_SIZE = None
self.CONNECTION = conn
self.updateEnCount()
self.updateDeCount()
self.updateCoCount()
self.updateQuSize()
def queueEnqueueCount(self):
# MSG_NAMES = ['JMSMessageID="ID:localhost-39797-1466874134889-3:1:-1:1:1"']
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "EnqueueCount"
}
return json.dumps(REUQEST)
def queueDequeueCount(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "DequeueCount"
}
return json.dumps(REUQEST)
def queueConsumerCount(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "ConsumerCount"
}
return json.dumps(REUQEST)
def queueSize(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "QueueSize"
}
return json.dumps(REUQEST)
def browseMessages(self):
REUQEST = {
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"operation": "browse()",
# "arguments": [""]
}
return json.dumps(REUQEST)
Ars, [26.01.17 14:06]
def purge(self):
REUQEST = {
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"operation": "purge()"
}
return json.dumps(REUQEST)
#ARS
def deleteMsg(self, ID):
REUQEST = {
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"operation": "deleteMessage()",
"arguments": [ID, "11111111-1111-1111-1111-111111111111"]
}
return json.dumps(REUQEST)
def request(self, name, param):
http = urllib3.PoolManager()
if name == "queueEnqueueCount":
body = self.queueEnqueueCount()
elif name == "queueDequeueCount":
body = self.queueDequeueCount()
elif name == "queueConsumerCount":
body = self.queueConsumerCount()
elif name == "queueSize":
body = self.queueSize()
elif name == "browseMessages":
body = self.browseMessages()
elif name == "purge":
body = self.purge()
elif name == "delete_msg":
body = self.deleteMsg(param)
url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
return r.data
def updateEnCount(self):
try:
self.ENQUEUE_COUNT = json.loads(self.request("queueEnqueueCount", {}))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1
def updateDeCount(self):
try:
self.DEQUEUE_COUNT = json.loads(self.request("queueDequeueCount", {}))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1
def updateCoCount(self):
try:
self.CONSUMER_COUNT = json.loads(self.request("queueConsumerCount", {}))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1
def updateQuSize(self):
try:
self.QUEUE_SIZE = json.loads(self.request("queueSize", {}))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1
def updateMessages(self):
self.MESSAGES = []
res = json.loads(self.request("browseMessages", {}))["value"]
for msg in res:
data = {
"id": msg["JMSMessageID"],
"data": msg["Text"],
"timestamp": msg["JMSTimestamp"],
"priority": msg["JMSPriority"]
}
self.MESSAGES.append(data)
def update(self):
self.updateEnCount()
self.updateDeCount()
self.updateCoCount()
self.updateQuSize()
self.updateMessages()
def getInfo(self):
self.updateEnCount()
self.updateDeCount()
self.updateCoCount()
self.updateQuSize()
return {
"queue_name": self.QUEUE_NAME,
"enqueue_count": self.ENQUEUE_COUNT,
"dequeue_count": self.DEQUEUE_COUNT,
"consumer_count": self.CONSUMER_COUNT,
"queue_size": self.QUEUE_SIZE
}
def browse(self):
self.updateMessages()
data = []
for msg in self.MESSAGES:
chunk = {
"id": msg["id"],
"timestamp": msg["timestamp"],
"priority": msg["priority"]
}
data.append(chunk)
return data
Ars, [26.01.17 14:06]
def message(self, msg_id):
self.updateMessages()
for msg in self.MESSAGES:
if msg["id"] == msg_id:
return msg["data"]
# ARS
def messages(self):
self.updateMessages()
return self.MESSAGES
# ARS
def delete_msg(self, id):
return json.loads(self.request("delete_msg",id))
def clear(self):
return json.loads(self.request("purge", {}))
关于Python。如何从队列/主题 ActiveMQ 中删除任何消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41827469/
我一直在读到,如果一个集合“被释放”,它也会释放它的所有对象。另一方面,我还读到,一旦集合被释放,集合就会释放它的对象。 但最后一件事可能并不总是发生,正如苹果所说。系统决定是否取消分配。在大多数情况
我有一个客户端-服务器应用程序,它使用 WCF 进行通信,并使用 NetDataContractSerializer 序列化对象图。 由于服务器和客户端之间传输了大量数据,因此我尝试通过微调数据成员的
我需要有关 JMS 队列和消息处理的帮助。 我有一个场景,需要针对特定属性组同步处理消息,但可以在不同属性组之间同时处理消息。 我了解了特定于每个属性的消息组和队列的一些知识。我的想法是,我想针对
我最近开始使用 C++,并且有一种强烈的冲动 #define print(msg) std::cout void print(T const& msg) { std::cout void
我已经为使用 JGroups 编写了简单的测试。有两个像这样的简单应用程序 import org.jgroups.*; import org.jgroups.conf.ConfiguratorFact
这个问题在这里已经有了答案: Firebase messaging is not supported in your browser how to solve this? (3 个回答) 7 个月前关
在我的 C# 控制台应用程序中,我正在尝试更新 CRM 2016 中的帐户。IsFaulted 不断返回 true。当我向下钻取时它返回的错误消息如下: EntityState must be set
我正在尝试通过 tcp 将以下 json 写入 graylog 服务器: {"facility":"GELF","file":"","full_message":"Test Message Tcp",
我正在使用 Django 的消息框架来指示成功的操作和失败的操作。 如何排除帐户登录和注销消息?目前,登录后登陆页面显示 已成功登录为“用户名”。我不希望显示此消息,但应显示所有其他成功消息。我的尝试
我通过编写禁用qDebug()消息 CONFIG(release, debug|release):DEFINES += QT_NO_DEBUG_OUTPUT 在.pro文件中。这很好。我想知道是否可以
我正在使用 ThrottleRequest 来限制登录尝试。 在 Kendler.php 我有 'throttle' => \Illuminate\Routing\Middleware\Throttl
我有一个脚本,它通过die引发异常。捕获异常时,我想输出不附加位置信息的消息。 该脚本: #! /usr/bin/perl -w use strict; eval { die "My erro
允许的消息类型有哪些(字符串、字节、整数等)? 消息的最大大小是多少? 队列和交换器的最大数量是多少? 最佳答案 理论上任何东西都可以作为消息存储/发送。实际上您不想在队列上存储任何内容。如果队列大部
基本上,我正在尝试创建一个简单的 GUI 来与 Robocopy 一起使用。我正在使用进程打开 Robocopy 并将输出重定向到文本框,如下所示: With MyProcess.StartI
我想将进入 MQ 队列的消息记录到数据库/文件或其他日志队列,并且我无法修改现有代码。是否有任何方法可以实现某种类似于 HTTP 嗅探器的消息记录实用程序?或者也许 MQ 有一些内置的功能来记录消息?
我得到了一个带有 single_selection 数据表和一个命令按钮的页面。命令按钮调用一个 bean 方法来验证是否进行了选择。如果不是,它应该显示一条消息警告用户。如果进行了选择,它将导航到另
我知道 MSVC 可以通过 pragma 消息做到这一点 -> http://support.microsoft.com/kb/155196 gcc 是否有办法打印用户创建的警告或消息? (我找不到谷
当存在大量节点或二进制数据时, native Erlang 消息能否提供合理的性能? 情况 1:有一个大约 50-200 台机器的动态池(erlang 节点)。它在不断变化,每 10 分钟大约添加或删
我想知道如何在用户登录后显示“欢迎用户,您已登录”的问候消息,并且该消息应在 5 秒内消失。 该消息将在用户成功登录后显示一次,但在同一 session 期间连续访问主页时不会再次显示。因为我在 ho
如果我仅使用Welcome消息,我的代码可以正常工作,但是当打印p->client_name指针时,消息不居中。 所以我的问题是如何将消息和客户端名称居中,就像它是一条消息一样。为什么它目前仅将消
我是一名优秀的程序员,十分优秀!