- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章PHP高级编程之消息队列原理与实现方法详解由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
本文实例讲述了PHP高级编程之消息队列原理与实现方法。分享给大家供大家参考,具体如下:
消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式 。
消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读出。通过消息队列,应用程序可独立地执行,它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息.
你首先需要弄清楚,消息队列与远程过程调用的区别,在很多读者咨询我的时候,我发现他们需要的是RPC(远程过程调用),而不是消息队列.
消息队列有同步或异步实现方式,通常我们采用异步方式使用消息队列,远程过程调用多采用同步方式.
MQ与RPC有什么不同? MQ通常传递无规则协议,这个协议由用户定义并且实现存储转发;而RPC通常是专用协议,调用过程返回结果.
同步需求,远程过程调用(PRC)更适合你.
异步需求,消息队列更适合你.
目前很多消息队列软件同时支持RPC功能,很多RPC系统也能异步调用.
消息队列用来实现下列需求 。
① 存储转发 。
② 分布式事务 。
③ 发布订阅 。
④ 基于内容的路由 。
⑤ 点对点连接 。
通常的做法,如果小的项目团队可以有一个人实现,包括消息的推送,接收处理。如果大型团队,通常是定义好消息协议,然后各自开发各自的部分,例如一个团队负责写推送协议部分,另一个团队负责写接收与处理部分.
那么为什么我们不讲消息队列框架化呢?
框架化有几个好处:
① 开发者不用学习消息队列接口 ② 开发者不需要关心消息推送与接收 ③ 开发者通过统一的API推送消息 ④ 开发者的重点是实现业务逻辑功能 。
下面是作者开发的一个SOA框架,该框架提供了三种接口,分别是SOAP,RESTful,AMQP(RabbitMQ),理解了该框架思想,你很容易进一步扩展,例如增加XML-RPC, ZeroMQ等等支持.
https://github.com/netkiller/SOA 。
本文只讲消息队列框架部分.
消息队列框架是本地应用程序(命令行程序),我们为了让他在后台运行,需要实现守护进程.
https://github.com/netkiller/SOA/blob/master/bin/rabbitmq.php 。
每个实例处理一组队列,实例化需要提供三个参数,$queueName = '队列名', $exchangeName = '交换名', $routeKey = '路由' 。
1
|
$daemon
=
new
\framework\RabbitDaemon(
$queueName
=
'email'
,
$exchangeName
=
'email'
,
$routeKey
=
'email'
);
|
守护进程需要使用root用户运行,运行后会切换到普通用户,同时创建进程ID文件,以便进程停止的时候使用.
守护进程核心代码https://github.com/netkiller/SOA/blob/master/system/rabbitdaemon.class.php 。
消息协议是一个数组,将数组序列化或者转为JSON推送到消息队列服务器,这里使用json格式的协议.
1
2
3
4
5
6
7
8
|
$msg
=
array
(
'Namespace'
=>
'namespace'
,
"Class"
=>
"Email"
,
"Method"
=>
"smtp"
,
"Param"
=>
array
(
$mail
,
$subject
,
$message
, null
)
);
|
序列化后的协议 。
1
|
{
"Namespace"
:
"single"
,
"Class"
:
"Email"
,
"Method"
:
"smtp"
,
"Param"
:[
"netkiller@msn.com"
,
"Hello"
,
" TestHelloWorld"
,null]}
|
使用json格式是考虑到通用性,这样推送端可以使用任何语言。如果不考虑兼容,建议使用二进制序列化,例如msgpack效率更好.
消息队列处理核心代码 。
https://github.com/netkiller/SOA/blob/master/system/rabbitmq.class.php 。
所以消息的处理在下面一段代码中进行 。
1
2
3
4
5
6
7
8
9
10
|
$this
->queue->consume(
function
(
$envelope
,
$queue
) {
$speed
= microtime(true);
$msg
=
$envelope
->getBody();
$result
=
$this
->loader(
$msg
);
$queue
->ack(
$envelope
->getDeliveryTag());
//手动发送ACK应答
//$this->logging->info(''.$msg.' '.$result)
$this
->logging->debug(
'Protocol: '
.
$msg
.
' '
);
$this
->logging->debug(
'Result: '
.
$result
.
' '
);
$this
->logging->debug(
'Time: '
. (microtime(true) -
$speed
) .
''
);
});
|
public function loader($msg = null) 负责拆解协议,然后载入对应的类文件,传递参数,运行方法,反馈结果.
Time 可以输出程序运行所花费的时间,对于后期优化十分有用.
提示 。
loader() 可以进一步优化,使用多线程每次调用loader将任务提交到线程池中,这样便可以多线程处理消息队列.
测试代码 https://github.com/netkiller/SOA/blob/master/test/queue/email.php 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
<?php
$queueName
=
'example'
;
$exchangeName
=
'email'
;
$routeKey
=
'email'
;
$mail
=
$argv
[1];
$subject
=
$argv
[2];
$message
=
empty
(
$argv
[3]) ?
'Hello World!'
:
' '
.
$argv
[3];
$connection
=
new
AMQPConnection(
array
(
'host'
=>
'192.168.4.1'
,
'port'
=>
'5672'
,
'vhost'
=>
'/'
,
'login'
=>
'guest'
,
'password'
=>
'guest'
));
$connection
->connect()
or
die
(
"Cannot connect to the broker!\n"
);
$channel
=
new
AMQPChannel(
$connection
);
$exchange
=
new
AMQPExchange(
$channel
);
$exchange
->setName(
$exchangeName
);
$queue
=
new
AMQPQueue(
$channel
);
$queue
->setName(
$queueName
);
$queue
->setFlags(AMQP_DURABLE);
$queue
->declareQueue();
$msg
=
array
(
'Namespace'
=>
'namespace'
,
"Class"
=>
"Email"
,
"Method"
=>
"smtp"
,
"Param"
=>
array
(
$mail
,
$subject
,
$message
, null
)
);
$exchange
->publish(json_encode(
$msg
),
$routeKey
);
printf(
"[x] Sent %s \r\n"
, json_encode(
$msg
));
$connection
->disconnect();
|
这里只给出了少量测试与演示程序,如有疑问请到渎者群,或者公众号询问.
上面消息队列 核心代码如下 。
1
2
3
4
5
|
$this
->queue->consume(
function
(
$envelope
,
$queue
) {
$msg
=
$envelope
->getBody();
$result
=
$this
->loader(
$msg
);
$queue
->ack(
$envelope
->getDeliveryTag());
});
|
这段代码生产环境使用了半年,发现效率比较低。有些业务场入队非常快,但处理起来所花的时间就比较长,容易出现队列堆积现象.
增加多线程可能更有效利用硬件资源,提高业务处理能力。代码如下 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
<?php
namespace
framework;
require_once
( __DIR__.
'/autoload.class.php'
);
class
RabbitThread
extends
\Threaded {
private
$queue
;
public
$classspath
;
protected
$msg
;
public
function
__construct(
$queue
,
$logging
,
$msg
) {
$this
->classspath = __DIR__.
'/../queue'
;
$this
->msg =
$msg
;
$this
->logging =
$logging
;
$this
->queue =
$queue
;
}
public
function
run() {
$speed
= microtime(true);
$result
=
$this
->loader(
$this
->msg);
$this
->logging->debug(
'Result: '
.
$result
.
' '
);
$this
->logging->debug(
'Time: '
. (microtime(true) -
$speed
) .
''
);
}
// private
public
function
loader(
$msg
= null){
$protocol
= json_decode(
$msg
,true);
$namespace
=
$protocol
[
'Namespace'
];
$class
=
$protocol
[
'Class'
];
$method
=
$protocol
[
'Method'
];
$param
=
$protocol
[
'Param'
];
$result
= null;
$classspath
=
$this
->classspath.
'/'
.
$this
->queue.
'/'
.
$namespace
.
'/'
.
strtolower
(
$class
) .
'.class.php'
;
if
(
is_file
(
$classspath
) ){
require_once
(
$classspath
);
//$class = ucfirst(substr($request_uri, strrpos($request_uri, '/')+1));
if
(
class_exists
(
$class
)) {
if
(method_exists(
$class
,
$method
)){
$obj
=
new
$class
;
if
(!
$param
){
$tmp
=
$obj
->
$method
();
$result
= json_encode(
$tmp
);
$this
->logging->info(
$class
.
'->'
.
$method
.
'()'
);
}
else
{
$tmp
= call_user_func_array(
array
(
$obj
,
$method
),
$param
);
$result
= (json_encode(
$tmp
));
$this
->logging->info(
$class
.
'->'
.
$method
.
'("'
.implode(
'","'
,
$param
).
'")'
);
}
}
else
{
$this
->logging->error(
'Object '
.
$class
.
'->'
.
$method
.
' is not exist.'
);
}
}
else
{
$msg
= sprintf(
"Object is not exist. (%s)"
,
$class
);
$this
->logging->error(
$msg
);
}
}
else
{
$msg
= sprintf(
"Cannot loading interface! (%s)"
,
$classspath
);
$this
->logging->error(
$msg
);
}
return
$result
;
}
}
class
RabbitMQ {
const
loop = 10;
protected
$queue
;
protected
$pool
;
public
function
__construct(
$queueName
=
''
,
$exchangeName
=
''
,
$routeKey
=
''
) {
$this
->config =
new
\framework\Config(
'rabbitmq.ini'
);
$this
->logfile = __DIR__.
'/../log/rabbitmq.%s.log'
;
$this
->logqueue = __DIR__.
'/../log/queue.%s.log'
;
$this
->logging =
new
\framework\log\Logging(
$this
->logfile,
$debug
=true);
//.H:i:s
$this
->queueName =
$queueName
;
$this
->exchangeName =
$exchangeName
;
$this
->routeKey =
$routeKey
;
$this
->pool =
new
\Pool(
$this
->config->get(
'pool'
)[
'thread'
]);
}
public
function
main(){
$connection
=
new
\AMQPConnection(
$this
->config->get(
'rabbitmq'
));
try
{
$connection
->connect();
if
(!
$connection
->isConnected()) {
$this
->logging->exception(
"Cannot connect to the broker!"
.PHP_EOL);
}
$this
->channel =
new
\AMQPChannel(
$connection
);
$this
->exchange =
new
\AMQPExchange(
$this
->channel);
$this
->exchange->setName(
$this
->exchangeName);
$this
->exchange->setType(AMQP_EX_TYPE_DIRECT);
//direct类型
$this
->exchange->setFlags(AMQP_DURABLE);
//持久�?
$this
->exchange->declareExchange();
$this
->queue =
new
\AMQPQueue(
$this
->channel);
$this
->queue->setName(
$this
->queueName);
$this
->queue->setFlags(AMQP_DURABLE);
//持久�?
$this
->queue->declareQueue();
$this
->queue->bind(
$this
->exchangeName,
$this
->routeKey);
$this
->queue->consume(
function
(
$envelope
,
$queue
) {
$msg
=
$envelope
->getBody();
$this
->logging->debug(
'Protocol: '
.
$msg
.
' '
);
//$result = $this->loader($msg);
$this
->pool->submit(
new
RabbitThread(
$this
->queueName,
new
\framework\log\Logging(
$this
->logqueue,
$debug
=true),
$msg
));
$queue
->ack(
$envelope
->getDeliveryTag());
});
$this
->channel->qos(0,1);
}
catch
(\AMQPConnectionException
$e
){
$this
->logging->exception(
$e
->__toString());
}
catch
(\Exception
$e
){
$this
->logging->exception(
$e
->__toString());
$connection
->disconnect();
$this
->pool->shutdown();
}
}
private
function
fault(
$tag
,
$msg
){
$this
->logging->exception(
$msg
);
throw
new
\Exception(
$tag
.
': '
.
$msg
);
}
public
function
__destruct() {
}
}
|
希望本文所述对大家PHP程序设计有所帮助.
原文链接:https://blog.csdn.net/luyaran/article/details/53034382 。
最后此篇关于PHP高级编程之消息队列原理与实现方法详解的文章就讲到这里了,如果你想了解更多关于PHP高级编程之消息队列原理与实现方法详解的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我在 JavaScript 文件中运行 PHP,例如...... var = '';). 我需要使用 JavaScript 来扫描字符串中的 PHP 定界符(打开和关闭 PHP 的 )。 我已经知道使
我希望能够做这样的事情: php --determine-oldest-supported-php-version test.php 并得到这个输出: 7.2 也就是说,php 二进制检查 test.
我正在开发一个目前不使用任何框架的大型 php 站点。我的大问题是,随着时间的推移慢慢尝试将框架融入应用程序是否可取,例如在创建的新部件和更新的旧部件中? 比如所有的页面都是直接通过url服务的,有几
下面是我的源代码,我想在同一页面顶部的另一个 php 脚本中使用位于底部 php 脚本的变量 $r1。我需要一个简单的解决方案来解决这个问题。我想在代码中存在的更新查询中使用该变量。 $name)
我正在制作一个网站,根据不同的情况进行大量 PHP 重定向。就像这样...... header("Location: somesite.com/redirectedpage.php"); 为了安全起见
我有一个旧网站,我的 php 标签从 因为短标签已经显示出安全问题,并且在未来的版本中将不被支持。 关于php - 如何避免在 php 文件中写入
我有一个用 PHP 编写的配置文件,如下所示, 所以我想用PHP开发一个接口(interface),它可以编辑文件值,如$WEBPATH , $ACCOUNTPATH和 const值(value)观
我试图制作一个登录页面来学习基本的PHP,首先我希望我的独立PHP文件存储HTML文件的输入(带有表单),但是当我按下按钮时(触发POST到PHP脚本) )我一直收到令人不愉快的错误。 我已经搜索了S
我正在寻找一种让 PHP 以一种形式打印任意数组的方法,我可以将该数组作为赋值包含在我的(测试)代码中。 print_r 产生例如: Array ( [0] => qsr-part:1285 [1]
这个问题已经有答案了: 已关闭11 年前。 Possible Duplicate: What is the max key size for an array in PHP? 正如标题所说,我想知道
我正在寻找一种让 PHP 以一种形式打印任意数组的方法,我可以将该数组作为赋值包含在我的(测试)代码中。 print_r 产生例如: Array ( [0] => qsr-part:1285 [1]
关闭。这个问题需要多问focused 。目前不接受答案。 想要改进此问题吗?更新问题,使其仅关注一个问题 editing this post . 已关闭 9 年前。 Improve this ques
我在 MySQL 数据库中有一个表,其中存储餐厅在每个工作日和时段提供的菜单。 表结构如下: i_type i_name i_cost i_day i_start i_
我有两页。 test1.php 和 test2.php。 我想做的就是在 test1.php 上点击提交,并将 test2.php 显示在 div 中。这实际上工作正常,但我需要向 test2.php
我得到了这个代码。我想通过textarea更新mysql。我在textarea中回显我的MySQL,但我不知道如何更新它,我应该把所有东西都放进去吗,因为_GET模式没有给我任何东西,我也尝试_GET
首先,我是 php 的新手,所以我仍在努力学习。我在 Wordpress 上创建了一个表单,我想将值插入一个表(data_test 表,我已经管理了),然后从 data_test 表中获取所有列(id
我有以下函数可以清理用户或网址的输入: function SanitizeString($var) { $var=stripslashes($var); $va
我有一个 html 页面,它使用 php 文件查询数据库,然后让用户登录,否则拒绝访问。我遇到的问题是它只是重定向到 php 文件的 url,并且从不对发生的事情提供反馈。这是我第一次使用 html、
我有一个页面充满了指向 pdf 的链接,我想跟踪哪些链接被单击。我以为我可以做如下的事情,但遇到了问题: query($sql); if($result){
我正在使用 从外部文本文件加载 HTML/PHP 代码 $f = fopen($filename, "r"); while ($line = fgets($f, 4096)) { print $l
我是一名优秀的程序员,十分优秀!