- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
对于服务器自动化,我们正在尝试开发一种工具,它可以在不同的服务器上处理和执行大量任务。我们将任务和服务器主机名发送到队列中。然后请求者使用队列,将信息提供给 ansible api。为了实现这一点,我们可以一次执行多个任务,我们使用了线程。
现在我们陷入了对消息的确认...
到目前为止我们做了什么:requester.py
使用队列并启动一个线程,ansible 任务正在其中运行。然后将结果发送到另一个队列。因此,每条新消息都会创建一个新线程。任务是否完成,线程死亡。
但现在来了困难的部分。我们必须使消息持久化,以防我们的服务器死机。因此,每条消息都应该在 来自 ansible 的结果发回后得到确认。
我们现在的问题是,当我们尝试在线程本身中确认消息时,不再有“同时”完成的工作,因为 pika 的 consume
等待确认。那么我们如何才能实现 consume
消费消息而不是等待确认呢?或者我们如何解决或改进我们的小程序?
请求者.py
#!/bin/python
from worker import *
import ansible.inventory
import ansible.runner
import threading
class Requester(Worker):
def __init__(self):
Worker.__init__(self)
self.connection(self.selfhost, self.from_db)
self.receive(self.from_db)
def send(self, result, ch, method):
self.channel.basic_publish(exchange='',
routing_key=self.to_db,
body=result,
properties=pika.BasicProperties(
delivery_mode=2,
))
print "[x] Sent \n" + result
ch.basic_ack(delivery_tag = method.delivery_tag)
def callAnsible(self, cmd, ch, method):
#call ansible api pre 2.0
result = json.dumps(result, sort_keys=True, indent=4, separators=(',', ': '))
self.send(result, ch, method)
def callback(self, ch, method, properties, body):
print(" [x] Received by requester %r" % body)
t = threading.Thread(target=self.callAnsible, args=(body,ch,method,))
t.start()
worker.py
import pika
import ConfigParser
import json
import os
class Worker(object):
def __init__(self):
#read some config files
def callback(self, ch, method, properties, body):
raise Exception("Call method in subclass")
def receive(self, queue):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.callback,queue=queue)
self.channel.start_consuming()
def connection(self,server,queue):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=server,
credentials=self.credentials))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=queue, durable=True)
我们正在使用 Python 2.7 和 pika 0.10.0。
是的,我们在鼠兔常见问题解答中注意到:http://pika.readthedocs.io/en/0.10.0/faq.html
鼠兔不是线程安全的。
最佳答案
禁用自动确认并将预取计数设置为大于 1 的值,具体取决于您希望消费者接收多少消息。
这里是设置prefetch的方法channel.basic_qos(prefetch_count=1)
,发现here .
关于python - Pika:消费下一条消息,即使上一条消息未被确认,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37854973/
我想使用我编写的类模块的事件。类模块如下所示 ''CError64Row Public Event ErrorClicked(ByVal row As Integer, ByVal column As
我正在寻找实现智能架构的良好实践,以及处理针对具有许多不同 wdsl web 服务的系统的集成的方法。 我已经有 2 年的爱好使用 C# 进行开发了~,因此我并不总是使用正确的术语,但我会尝试描述我正
目前,我正在为我的程序使用 Azure Consumer API。但它非常慢,几乎需要8秒才能给出响应。我现在应该怎么做?这是我正在使用的 azure API.. https://management
我的流程是: AcitveMQ 控制台在主题部分下显示了一个使用者,但是一旦
我一直在阅读类似 Why does a function that accepts a Box complain of a value being moved when a function that
AMQP 函数 consume() 是一个带有回调的阻塞函数,是否可以为 consume() 函数设置超时,以便在特定时间后不再阻塞并且代码执行完成? 最佳答案 是的,方法如下: $amqp = ne
我有一个客户端/服务器应用程序,其中客户端以 JSON 形式将对象发送到运行 PHP 脚本的服务器,然后将此数据放入数据库。 问题是解码是用 json_decode 函数完成的,它似乎适用于字符串而不
所以我已经模拟了我的生产者消费者问题并且我有下面的代码。我的问题是:如果消费者一直处于 while(true) 状态,他如何停止。 在下面的代码中,我添加了 i
我无法使用在delphi 中开发的dll 的功能。我在类型转换方面遇到了一些困难。 这是我要调用 DLL 的函数: function rData(ID: Cardinal; queue: WideSt
我想使用 Unity3D 可视化 Kafka 流。在 Unity 中访问数据流的最佳方式是什么? 我已经用 Node 和 C# 编写了基本使用者,但我不确定如何将它们合并到 Unity 中。任何帮助表
如果标题太笼统,我很抱歉,但我已经浏览了一个小时的互联网,但找不到任何架构解释。我对 RSS 和 Atom 协议(protocol)都是全新的,据我到目前为止所了解的是: 服务器发布文档 客户端订阅此
我很喜欢我刚刚发现的 Guzzle 框架。我正在使用它使用不同的响应结构跨多个 API 聚合数据。它可以使用 JSON 和 XML 找到,但我需要使用的服务之一使用 SOAP。是否有使用 Guzzle
有没有一种方法可以像访问 Microsoft.Azure.Management.Fluent 一样访问 Azure.Management.Conclusion.Models? 当我执行以下代码时,我看
我有这个部分场景图树: CustomPane (with onMouseClicked Handler) → ChildNode (with onMousePressed Handler) 当我在
我的问题是这个 json。 http://dev-rexolution.pantheonsite.io/api/noticias 我只需要使用 vuejs 2 使用数组的第一个元素才能显示它,使用我工
我是 ML 新手,一直在研究 CNTK 教程。我已经成功训练了几个模型。 我完成了迁移学习教程 ( https://github.com/Microsoft/CNTK/blob/v2.1/Tutori
我是 RabbitMq 和 AMQP 的新手,但我对 ActiveMQ 和 JMS 有一些经验。我尝试在主题(JMS 中的主题之类的主题)中发布一条消息,并从多个监听器中使用此消息。比如我发布一条消息
我正在尝试让我的服务器解析以下 JSON: {"hardwareId":1,"registerTime":"2017-02-14T03:42:11.482Z","sensorId":1,"temper
我正在开发一个从外部 url 使用 json 的网站,我试过了但是我得到了一个错误 XMLHttpRequest 无法加载 http://reuniyo.com/tst/json.php。 Acces
我正在尝试使用Kafka Streams(即不是简单的Kafka Consumer)从重试主题中读取之前无法处理的事件。我希望从重试主题中进行消费,如果处理仍然失败(例如,如果外部系统已关闭),我希望
我是一名优秀的程序员,十分优秀!