gpt4 book ai didi

rabbitmq - 如何在 python 中使用 RabbitMQ 进行简单的单元测试?

转载 作者:行者123 更新时间:2023-12-05 05:03:56 25 4
gpt4 key购买 nike

在我的单元测试中,我想简单地开始消费、发布消息并接收返回的响应并断言响应是否符合我的预期。但是,我已经尝试这样做了几个小时,但没有找到解决方案。

问题是我无法在类中定义停止消费的方法。我试过定义这样的方法:

def stop(self):
self.channel.basic_cancel()
def stop(self):
self.channel.stop_consuming()
def stop(self):
self.connection.close()

但似乎没有任何效果。我读到这是因为一旦你执行start_consuming(),停止消费的唯一方法是在发送消息后取消它。但如果我这样做,那么我将修改原始的 on_request,这对我的应用程序没有用,因为连接将在第一条消息后关闭。我找到了 pytest-rabbitmq但是文档对我来说并不清楚,因此不知道我是否可以使用这个插件来实现我想要的。

顺便问一下,basic_cancelstop_consumingclose有什么区别?

最佳答案

我不清楚您的情况!据我了解,您可以使用相同的方法创建连接和 channel ,以便您可以在需要时发布、使用、断言和停止使用

希望这对您有所帮助!

def test_rabbitmq():
from pika import BlockingConnection, ConnectionParameters, PlainCredentials

conn = BlockingConnection(ConnectionParameters(host='host', virtual_host='vhost', credentials=PlainCredentials('username', 'password')))
channel = conn.channel()

# define your consumer
def on_message(channel, method_frame, header_frame, body):
message = body.decode()
# assert your message here
# asset message == 'value'
channel.basic_cancel('test-consumer') # stops the consumer

# define your publisher
def publish_message(message):
channel.basic_publish(exchange='', routing_key='', body=message')

publish('your message')
tag = channel.basic_consume(queue='queue', on_message_callback=on_message, consumer_tag='test-consumer')

stop_consuming - 取消所有消费者,发出 start_consuming 循环退出的信号。

basic_cancel - 此方法取消消费者。消费者标签将作为输入。

close - 关闭连接/ channel

Reference

关于rabbitmq - 如何在 python 中使用 RabbitMQ 进行简单的单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61420518/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com