gpt4 book ai didi

rabbitmq - 您如何使用多条消息回复 RabbitMQ RPC 客户端?

转载 作者:行者123 更新时间:2023-12-05 06:20:28 27 4
gpt4 key购买 nike

我正在尝试在 RPC 环境中使用 RabbitMQ,在这种环境中,每个远程调用都将花费大量时间,并不断产生结果。我希望在生成结果时将结果交付给客户。

我从标准教程 RPC 示例开始,然后修改它以使用“直接回复”。我将所有中间结果发布回“匿名独占回调队列”,而不确认原始请求。处理完成后,我将最终消息发送回客户端,然后确认原始请求。但是客户端只看到第一个中间消息。我的客户端恰好是 PHP 而我的服务器是 Python,但我怀疑这不相关。有没有人有使这项工作的魔力?我可以发布代码,但这是食谱中非常基本的内容。

最佳答案

回答我自己的问题。以下工作:

PHP 客户端:

#!/usr/bin/php
<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RpcClient {
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;

public function __construct() {
$this->connection = new AMQPStreamConnection(
'localhost', 5672, 'guest', 'guest'
);
$this->channel = $this->connection->channel();

list($this->callback_queue, ,) = $this->channel->queue_declare(
"", false, false, true, false
);


# For direct reply-to, need to consume amq.rabbitmq.repy-to, a special queue name
# Unclear what happens to the declare above
$this->channel->basic_consume(
$this->callback_queue, '', false, true,
false, false, array($this, 'onResponse')
);
}

# This is going to be called once for each message coming back
public function onResponse($rep) {
if ($rep->get('correlation_id') == $this->corr_id) {
$response = json_decode($rep->body, true);
echo print_r($response['line'], true);
if ($response['type'] == 'final') {
$this->response = $rep->body;
}
}
}

public function call($message_array) {
$this->response = null;
$this->corr_id = uniqid();

$jsonm = json_encode($message_array);
$msg = new AMQPMessage(
$jsonm,
array(
'correlation_id' => $this->corr_id,
### Not sure which of the next two lines is the correct one... if either....
##'reply_to' => 'amq.rabbitmq.reply-to' # This is when using direct reply-to
'reply_to' => $this->callback_queue
)
);
$this->channel->basic_publish($msg, '', 'ansiblePB_rpc_queue');
while (!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
}

$ansiblepb_rpc = new RpcClient();
$response = $ansiblepb_rpc->call(array('userID' => 'jb1234',
'user_display_name' => 'Joe Bloe',
'limit' => '24000'));
echo ' [.] Got ', $response, "\n";
?>

Python 服务器:

#!/usr/bin/env python
""" 1 """
import glob
import json
import platform
import os
import re
import shutil
import subprocess
import time
import yaml

import pika

class RMQmultireply(object):
""" Generic class to support ansible_playbook on a Rabbit MQ RPC queue"""
def __init__(self, channel, method, props):
#""" Constructor.... duh """
self.channel = channel
self.method = method
self.props = props

def run(self, userID, username, limit):
""" Run the main guts of the service """

cmd = ['/home/dhutchin/devel/rmq/multilineoutput']

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

for line in proc.stdout.readlines():
intermediate_json_result = json.dumps({'type': 'intermediate', 'line': line})

self.channel.basic_publish(exchange='',
routing_key=self.props.reply_to,
properties=pika.BasicProperties(
correlation_id=self.props.correlation_id),
body=str(intermediate_json_result))
#self.channel.basic_ack(delivery_tag=self.method.delivery_tag)

proc.wait()
return proc.returncode


def on_request(channel, method, props, jsonstring):
""" Request has just come in to run ansible_playbook """


playbook = RMQmultireply(channel, method, props)

# fork and exec a playbook
# Recieve each line of output and send them as received back
# to the requestor.
# .run does not return until playbook exits.
# Use "Direct Reply-to" mechanism to return multiple messages to
# our client.
request = yaml.load(jsonstring) # Yes, yaml works better than JSON
returncode = playbook.run(request['userID'], request['user_display_name'], request['limit'])

final_json_result = json.dumps({'type': "final", 'line': '', 'rc': returncode})

channel.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=
props.correlation_id),
body=str(final_json_result))

# Acknowlege the original message so that RabbitMQ can remove it
# from the ansiblePB_rpc_queue queue
channel.basic_ack(delivery_tag=method.delivery_tag)


def main():
""" Its kinda obvious what this does """

try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
except Exception:
print "pika.BlockingConnection.... failed... maybe RabbitMQ is not running"
quit()

channel = connection.channel()

channel.queue_declare(queue='ansiblePB_rpc_queue')

channel.basic_qos(prefetch_count=1)
# auto_ack is turned off by default, so we don't need to specify auto_ack=False
channel.basic_consume(queue='ansiblePB_rpc_queue', on_message_callback=on_request)

print " [x] Awaiting RPC requests"
channel.start_consuming()


if __name__ == '__main__':
main()

关于rabbitmq - 您如何使用多条消息回复 RabbitMQ RPC 客户端?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60442782/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com