博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitmq类
阅读量:5307 次
发布时间:2019-06-14

本文共 6152 字,大约阅读时间需要 20 分钟。

1、accept.php消费者代码需要在命令行执行

2、'username'=>'asdf','password'=>'123456' 改成自己的帐号和密码

RabbitMQCommand.php操作类代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
<?php
/*
 
* amqp协议操作类,可以访问rabbitMQ
 
* 需先安装php_amqp扩展
 
*/
class
RabbitMQCommand{
  
public
$configs
=
array
();
  
//交换机名称
  
public
$exchange_name
=
''
;
  
//队列名称
  
public
$queue_name
=
''
;
  
//路由名称
  
public
$route_key
=
''
;
  
/*
   
* 持久化,默认True
   
*/
  
public
$durable
= True;
  
/*
   
* 自动删除
   
* exchange is deleted when all queues have finished using it
   
* queue is deleted when last consumer unsubscribes
   
*
   
*/
  
public
$autodelete
= False;
  
/*
   
* 镜像
   
* 镜像队列,打开后消息会在节点之间复制,有master和slave的概念
   
*/
  
public
$mirror
= False;
  
private
$_conn
= Null;
  
private
$_exchange
= Null;
  
private
$_channel
= Null;
  
private
$_queue
= Null;
  
/*
   
* @configs array('host'=>$host,'port'=>5672,'username'=>$username,'password'=>$password,'vhost'=>'/')
   
*/
  
public
function
__construct(
$configs
=
array
(),
$exchange_name
=
''
,
$queue_name
=
''
,
$route_key
=
''
) {
    
$this
->setConfigs(
$configs
);
    
$this
->exchange_name =
$exchange_name
;
    
$this
->queue_name =
$queue_name
;
    
$this
->route_key =
$route_key
;
  
}
  
private
function
setConfigs(
$configs
) {
    
if
(!
is_array
(
$configs
)) {
      
throw
new
Exception(
'configs is not array'
);
    
}
    
if
(!(
$configs
[
'host'
] &&
$configs
[
'port'
] &&
$configs
[
'username'
] &&
$configs
[
'password'
])) {
      
throw
new
Exception(
'configs is empty'
);
    
}
    
if
(
empty
(
$configs
[
'vhost'
])) {
      
$configs
[
'vhost'
] =
'/'
;
    
}
    
$configs
[
'login'
] =
$configs
[
'username'
];
    
unset(
$configs
[
'username'
]);
    
$this
->configs =
$configs
;
  
}
  
/*
   
* 设置是否持久化,默认为True
   
*/
  
public
function
setDurable(
$durable
) {
    
$this
->durable =
$durable
;
  
}
  
/*
   
* 设置是否自动删除
   
*/
  
public
function
setAutoDelete(
$autodelete
) {
    
$this
->autodelete =
$autodelete
;
  
}
  
/*
   
* 设置是否镜像
   
*/
  
public
function
setMirror(
$mirror
) {
    
$this
->mirror =
$mirror
;
  
}
  
/*
   
* 打开amqp连接
   
*/
  
private
function
open() {
    
if
(!
$this
->_conn) {
      
try
{
        
$this
->_conn =
new
AMQPConnection(
$this
->configs);
        
$this
->_conn->connect();
        
$this
->initConnection();
      
}
catch
(AMQPConnectionException
$ex
) {
        
throw
new
Exception(
'cannot connection rabbitmq'
,500);
      
}
    
}
  
}
  
/*
   
* rabbitmq连接不变
   
* 重置交换机,队列,路由等配置
   
*/
  
public
function
reset(
$exchange_name
,
$queue_name
,
$route_key
) {
    
$this
->exchange_name =
$exchange_name
;
    
$this
->queue_name =
$queue_name
;
    
$this
->route_key =
$route_key
;
    
$this
->initConnection();
  
}
  
/*
   
* 初始化rabbit连接的相关配置
   
*/
  
private
function
initConnection() {
    
if
(
empty
(
$this
->exchange_name) ||
empty
(
$this
->queue_name) ||
empty
(
$this
->route_key)) {
      
throw
new
Exception(
'rabbitmq exchange_name or queue_name or route_key is empty'
,500);
    
}
    
$this
->_channel =
new
AMQPChannel(
$this
->_conn);
    
$this
->_exchange =
new
AMQPExchange(
$this
->_channel);
    
$this
->_exchange->setName(
$this
->exchange_name);
    
$this
->_exchange->setType(AMQP_EX_TYPE_DIRECT);
    
if
(
$this
->durable)
      
$this
->_exchange->setFlags(AMQP_DURABLE);
    
if
(
$this
->autodelete)
      
$this
->_exchange->setFlags(AMQP_AUTODELETE);
    
$this
->_exchange->
declare
();
    
$this
->_queue =
new
AMQPQueue(
$this
->_channel);
    
$this
->_queue->setName(
$this
->queue_name);
    
if
(
$this
->durable)
      
$this
->_queue->setFlags(AMQP_DURABLE);
    
if
(
$this
->autodelete)
      
$this
->_queue->setFlags(AMQP_AUTODELETE);
    
if
(
$this
->mirror)
      
$this
->_queue->setArgument(
'x-ha-policy'
,
'all'
);
    
$this
->_queue->
declare
();
    
$this
->_queue->bind(
$this
->exchange_name,
$this
->route_key);
  
}
  
public
function
close() {
    
if
(
$this
->_conn) {
      
$this
->_conn->disconnect();
    
}
  
}
  
public
function
__sleep() {
    
$this
->close();
    
return
array_keys
(get_object_vars(
$this
));
  
}
  
public
function
__destruct() {
    
$this
->close();
  
}
  
/*
   
* 生产者发送消息
   
*/
  
public
function
send(
$msg
) {
    
$this
->open();
    
if
(
is_array
(
$msg
)){
      
$msg
= json_encode(
$msg
);
    
}
else
{
      
$msg
= trim(
strval
(
$msg
));
    
}
    
return
$this
->_exchange->publish(
$msg
,
$this
->route_key);
  
}
  
/*
   
* 消费者
   
* $fun_name = array($classobj,$function) or function name string
   
* $autoack 是否自动应答
   
*
   
* function processMessage($envelope, $queue) {
      
$msg = $envelope->getBody();
      
echo $msg."\n"; //处理消息
      
$queue->ack($envelope->getDeliveryTag());//手动应答
    
}
   
*/
  
public
function
run(
$fun_name
,
$autoack
= True){
    
$this
->open();
    
if
(!
$fun_name
|| !
$this
->_queue)
return
False;
    
while
(True){
      
if
(
$autoack
)
$this
->_queue->consume(
$fun_name
, AMQP_AUTOACK);
      
else
$this
->_queue->consume(
$fun_name
);
    
}
  
}
}

 

send.php生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
<?php
set_time_limit(0);
include_once
(
'RabbitMQCommand.php'
);
$configs
=
array
(
'host'
=>
'127.0.0.1'
,
'port'
=>5672,
'username'
=>
'asdf'
,
'password'
=>
'123456'
,
'vhost'
=>
'/'
);
$exchange_name
=
'class-e-1'
;
$queue_name
=
'class-q-1'
;
$route_key
=
'class-r-1'
;
$ra
=
new
RabbitMQCommand(
$configs
,
$exchange_name
,
$queue_name
,
$route_key
);
for
(
$i
=0;
$i
<=100;
$i
++){
  
$ra
->send(
date
(
'Y-m-d H:i:s'
,time()));
}
exit
();

accept.php消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?php
error_reporting
(0);
include_once
(
'RabbitMQCommand.php'
);
$configs
=
array
(
'host'
=>
'127.0.0.1'
,
'port'
=>5672,
'username'
=>
'asdf'
,
'password'
=>
'123456'
,
'vhost'
=>
'/'
);
$exchange_name
=
'class-e-1'
;
$queue_name
=
'class-q-1'
;
$route_key
=
'class-r-1'
;
$ra
=
new
RabbitMQCommand(
$configs
,
$exchange_name
,
$queue_name
,
$route_key
);
class
A{
  
function
processMessage(
$envelope
,
$queue
) {
    
$msg
=
$envelope
->getBody();
    
$envelopeID
=
$envelope
->getDeliveryTag();
    
$pid
= posix_getpid();
    
file_put_contents
(
"log{$pid}.log"
,
$msg
.
'|'
.
$envelopeID
.
''
.
"\r\n"
,FILE_APPEND);
    
$queue
->ack(
$envelopeID
);
  
}
}
$a
=
new
A();
$s
=
$ra
->run(
array
(
$a
,
'processMessage'
),false);

转载于:https://www.cnblogs.com/wzjwffg/p/11278045.html

你可能感兴趣的文章
JS写一个简单日历
查看>>
LCA的两种求法
查看>>
Python 发 邮件
查看>>
mysql忘记密码的解决办法
查看>>
全面分析Java的垃圾回收机制2
查看>>
[Code Festival 2017 qual A] C: Palindromic Matrix
查看>>
修改博客园css样式
查看>>
Python3 高阶函数
查看>>
初始面向对象
查看>>
docker一键安装
查看>>
leetcode Letter Combinations of a Phone Number
查看>>
Unity 5.4 测试版本新特性---因吹丝停
查看>>
7.5 文件操作
查看>>
DFS-hdu-2821-Pusher
查看>>
MyEclipse中将普通Java项目convert(转化)为Maven项目
查看>>
node js 安装.node-gyp/8.9.4 权限 无法访问
查看>>
windows基本命令
查看>>
VMware中CentOS设置静态IP
查看>>
[poj1006]Biorhythms
查看>>
jsp
查看>>