gpt4 book ai didi

Python。如何从队列/主题 ActiveMQ 中删除任何消息

转载 作者:太空宇宙 更新时间:2023-11-03 15:45:03 26 4
gpt4 key购买 nike

ActiveMQ 我用来在主题 (topic) 和队列 (queue) 中发送消息。

我有两个问题:

  1. 如何删除(取消操作)发送到队列或队列的消息话题。
  2. 如何完全删除清除所有队列/主题。

使用通过 STOMP 协议(protocol)组织的 AMQ stompy库,但是没有合适的functions

告诉我应该使用哪些库或解决方案本身。

非常感谢。

最佳答案

如何删除我知道的消息,但只是理论上的(通过 WireShark 分析包流量的结果,AMQ 管理页面 ActiveMQ 中的 localhost:8161\admin 页面中的工作浏览器),并且我无法以编程方式删除消息(Python)。

理论上我可以在AMQ中使用参数调用(我在删除时发送到\admin AMQ的packege中看到这个)deleteMessage()[id,secret],哪里

  • id - 队列\主题中消息的唯一名称
  • secret - 唯一的数字(可能是一些“ token ”),每次更新(例如 F5)\admin\browse 页面时都会发生变化。我不能说它是什么......

请参阅此处的图片:https://ru.stackoverflow.com/q/618697/228254在我的回答下面的帖子中。

示例测试队列:

  • id: ID:######NAME_SERVER######-44458-1485427798954-6:1:1:1:1
  • secret :1dbd2916-337a-48cc-bce7-63b00d38ba3

此时,我的想法是:获取队列中的所有消息并确认我需要丢弃\从队列中删除的消息。

这是我执行此操作的简单客户端代码:

from stompy import stomp
import json


s = stomp.Stomp(amq_ip, amq_port)

try:
s.connect(username=amq_user, password=amq_pass)
s.subscribe({'destination': '%s' % amq_queue, 'ack': 'client'})
except Exception as e:
print "ActiveMQ error\n %s" % e

while True:
try:
frame = s.receive_frame()
body = json.loads(frame.body)

# это сообщение для меня?
if body["interested_atr_in_msg"] == "interested_value_of_attr_in_msg":
print "Its for me. I receive it"
# Это сообщение для меня. Я его приму и обработаю
s.ack(frame)
else:
# Это сообщение предназначено для кого-то другого и мне не подходит
print "Its not for me"
except Exception as e:
print e

还添加从队列中删除消息的实验测试代码(不起作用,不删除)

# -*- coding: utf-8 -*-
import activemq_api
import urllib3
import json


# Connection to ActiveMQ
BROKER_NAME = "localhost"
AMQ_API_PORT = 8161
AMQ_API_USER = "admin"
AMQ_API_PASS = "admin"
AMQ_API_POSTFIX = "/api/jolokia"
AMQ_TASK_QUEUE_NAME = "test"
BASIC_AUTH ='%s:%s' % (AMQ_API_USER, AMQ_API_PASS)
AMQ_STATUS_QUEUE = "/queue/test"

LOGIN_EXEMPT_URLS = [
r'admin/'
]

LOGIN_URL = 'url_login'

LOGOUT_REDIRECT_URL = 'url_login'

if __name__ == '__main__':
user_agent = "curl/7.49.1"
headers = urllib3.util.make_headers(basic_auth=BASIC_AUTH, user_agent=user_agent)
addition = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "*/*"
}
try:
headers.update(addition)
connect = activemq_api.Connection(AMQ_IP, AMQ_API_PORT, BROKER_NAME, headers, AMQ_API_POSTFIX)
manager = activemq_api.AMQManager(connect)
except Exception as e:
print(u'%s: Превышено число максимальных попыток соединения к ActiveMQ' % e.__class__.__name__)
else:
print(u'Соединение успешно установлено')

try:
id="ID:№№№№№№№№№№№№№№№№№№-54825-1482598606528-3:586:-1:1:1"
secret="wertrtd-3fdf-4dfd-gr56-dfghdvhshtdfgdw"
print(manager.removeMsgQueue("test", id))
except Exception as inst:
print inst


#!/usr/bin/python2
# -*- coding: utf-8 -*-
import urllib3
import json

class Connection:
def __init__(self, amq_ip, amq_port, broker, header, postfix):
self.BROKER_NAME = broker
self.AMQ_IP = amq_ip
self.AMQ_PORT = amq_port
self.HEADERS = header
self.POSTFIX = postfix

class AMQManager():
def __init__(self, conn):
self.QUEUES = {}
self.QUEUES_COUNT = None
self.HEAP_MEMORY_USED = None
self.MEMORY_PERSENT_USED = None
self.CONNECTION = conn
self.update()

def rmQueue(self, queue_names):
REUQEST = {
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"operation": "removeQueue(java.lang.String)",
"arguments": [queue_names]
}
return json.dumps(REUQEST)

def queueList(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"attribute":"Queues"
}
return json.dumps(REUQEST)

def browseQueueSubscribers(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"attribute": "QueueSubscribers"
}
return json.dumps(REUQEST)

def memoryPersentUsed(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"attribute": "MemoryPercentUsage"
}
return json.dumps(REUQEST)

def heapMemoryUsed(self):
REUQEST = {
"type": "read",
"mbean": "java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"path":"used"
}
return json.dumps(REUQEST)

def request(self, name, param):
http = urllib3.PoolManager()
body = ''
if name == "removeQueue":
body = self.rmQueue(param["QUEUE_NAME"])
elif name == "queueList":
body = self.queueList()
elif name == "browseQueueSubscribers":
body = self.browseQueueSubscribers()
elif name == "memoryPersentUsed":
body = self.memoryPersentUsed()
elif name == "heapMemoryUsed":
body = self.heapMemoryUsed()

url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
return r.data

def updateQueues(self):
res = json.loads(self.request("queueList", {}))
# print res
data = []
for queue in res["value"]:
object = {}
queue["objectName"] = queue["objectName"].split(":")[1]
for key in queue["objectName"].split(","):
object.update({key.split("=")[0]: key.split("=")[1]})
data.append(object)
self.QUEUES_COUNT = 0
self.QUEUES = {}
# print data
for queue in data:
self.QUEUES.update({queue["destinationName"]: Queue(queue["destinationName"], self.CONNECTION)})
self.QUEUES_COUNT += 1

def updateHeapMem(self):
self.HEAP_MEMORY_USED = json.loads(self.request("heapMemoryUsed", {}))["value"]

def updatePersMem(self):
self.MEMORY_PERSENT_USED = json.loads(self.request("memoryPersentUsed", {}))["value"]

Ars, [26.01.17 14:06]
## EXPORTABLE
def update(self):
self.updateQueues()
self.updateHeapMem()
self.updatePersMem()
## EXPORTABLE
def getQueues(self):
self.updateQueues()
data = []
for queue in self.QUEUES:
data.append(self.QUEUES[queue].getInfo())
return {
"queues_count": self.QUEUES_COUNT,
"queues": data
}
## EXPORTABLE
def getQueueInfo(self, name):
return self.QUEUES[name].getInfo()
## EXPORTABLE
def browseQueue(self, name):
return self.QUEUES[name].browse()
## EXPORTABLE
def getMessage(self, name, msg_id):
return self.QUEUES[name].message(msg_id)
def getAllQueueMessages(self, name):
return self.QUEUES[name].messages()
## EXPORTABLE
def removeQueue(self, name):
param = {
"QUEUE_NAME": name
}
return json.loads(self.request("removeQueue", param))
## EXPORTABLE
def clearQueue(self, name):
return self.QUEUES[name].clear()
# ARS
def removeMsgQueue(self,nameQueue, id):
return self.QUEUES[nameQueue].delete_msg(id)



class Queue():
def __init__(self, q_name, conn):
# научите обращаться к атрибутам суперкласса!
self.MESSAGES = []
self.QUEUE_NAME = q_name
self.ENQUEUE_COUNT = None
self.DEQUEUE_COUNT = None
self.CONSUMER_COUNT = None
self.QUEUE_SIZE = None
self.CONNECTION = conn
self.updateEnCount()
self.updateDeCount()
self.updateCoCount()
self.updateQuSize()

def queueEnqueueCount(self):
# MSG_NAMES = ['JMSMessageID="ID:localhost-39797-1466874134889-3:1:-1:1:1"']
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "EnqueueCount"
}
return json.dumps(REUQEST)

def queueDequeueCount(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "DequeueCount"
}
return json.dumps(REUQEST)

def queueConsumerCount(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "ConsumerCount"
}
return json.dumps(REUQEST)

def queueSize(self):
REUQEST = {
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "QueueSize"
}
return json.dumps(REUQEST)

def browseMessages(self):
REUQEST = {
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"operation": "browse()",
# "arguments": [""]
}
return json.dumps(REUQEST)

Ars, [26.01.17 14:06]
def purge(self):
REUQEST = {
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"operation": "purge()"
}
return json.dumps(REUQEST)
#ARS
def deleteMsg(self, ID):
REUQEST = {
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"operation": "deleteMessage()",
"arguments": [ID, "11111111-1111-1111-1111-111111111111"]
}
return json.dumps(REUQEST)

def request(self, name, param):
http = urllib3.PoolManager()

if name == "queueEnqueueCount":
body = self.queueEnqueueCount()
elif name == "queueDequeueCount":
body = self.queueDequeueCount()
elif name == "queueConsumerCount":
body = self.queueConsumerCount()
elif name == "queueSize":
body = self.queueSize()
elif name == "browseMessages":
body = self.browseMessages()
elif name == "purge":
body = self.purge()
elif name == "delete_msg":
body = self.deleteMsg(param)

url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
return r.data

def updateEnCount(self):
try:
self.ENQUEUE_COUNT = json.loads(self.request("queueEnqueueCount", {}))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1

def updateDeCount(self):
try:
self.DEQUEUE_COUNT = json.loads(self.request("queueDequeueCount", {}))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1

def updateCoCount(self):
try:
self.CONSUMER_COUNT = json.loads(self.request("queueConsumerCount", {}))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1

def updateQuSize(self):
try:
self.QUEUE_SIZE = json.loads(self.request("queueSize", {}))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1

def updateMessages(self):
self.MESSAGES = []
res = json.loads(self.request("browseMessages", {}))["value"]
for msg in res:
data = {
"id": msg["JMSMessageID"],
"data": msg["Text"],
"timestamp": msg["JMSTimestamp"],
"priority": msg["JMSPriority"]
}
self.MESSAGES.append(data)

def update(self):
self.updateEnCount()
self.updateDeCount()
self.updateCoCount()
self.updateQuSize()
self.updateMessages()

def getInfo(self):
self.updateEnCount()
self.updateDeCount()
self.updateCoCount()
self.updateQuSize()
return {
"queue_name": self.QUEUE_NAME,
"enqueue_count": self.ENQUEUE_COUNT,
"dequeue_count": self.DEQUEUE_COUNT,
"consumer_count": self.CONSUMER_COUNT,
"queue_size": self.QUEUE_SIZE
}

def browse(self):
self.updateMessages()
data = []
for msg in self.MESSAGES:
chunk = {
"id": msg["id"],
"timestamp": msg["timestamp"],
"priority": msg["priority"]
}
data.append(chunk)
return data

Ars, [26.01.17 14:06]
def message(self, msg_id):
self.updateMessages()
for msg in self.MESSAGES:
if msg["id"] == msg_id:
return msg["data"]
# ARS
def messages(self):
self.updateMessages()
return self.MESSAGES

# ARS
def delete_msg(self, id):
return json.loads(self.request("delete_msg",id))

def clear(self):
return json.loads(self.request("purge", {}))

关于Python。如何从队列/主题 ActiveMQ 中删除任何消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41827469/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com