1、accept.php消费者代码需要在命令行执行
2、'username'=>'asdf'
,'password'=>'123456'
改成自己的帐号和密码
RabbitMQCommand.php操作类代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | <?php /* * amqp协议操作类,可以访问rabbitMQ * 需先安装php_amqp扩展 */ class RabbitMQCommand{ public $configs = array (); //交换机名称 public $exchange_name = '' ; //队列名称 public $queue_name = '' ; //路由名称 public $route_key = '' ; /* * 持久化,默认True */ public $durable = True; /* * 自动删除 * exchange is deleted when all queues have finished using it * queue is deleted when last consumer unsubscribes * */ public $autodelete = False; /* * 镜像 * 镜像队列,打开后消息会在节点之间复制,有master和slave的概念 */ public $mirror = False; private $_conn = Null; private $_exchange = Null; private $_channel = Null; private $_queue = Null; /* * @configs array('host'=>$host,'port'=>5672,'username'=>$username,'password'=>$password,'vhost'=>'/') */ public function __construct( $configs = array (), $exchange_name = '' , $queue_name = '' , $route_key = '' ) { $this ->setConfigs( $configs ); $this ->exchange_name = $exchange_name ; $this ->queue_name = $queue_name ; $this ->route_key = $route_key ; } private function setConfigs( $configs ) { if (! is_array ( $configs )) { throw new Exception( 'configs is not array' ); } if (!( $configs [ 'host' ] && $configs [ 'port' ] && $configs [ 'username' ] && $configs [ 'password' ])) { throw new Exception( 'configs is empty' ); } if ( empty ( $configs [ 'vhost' ])) { $configs [ 'vhost' ] = '/' ; } $configs [ 'login' ] = $configs [ 'username' ]; unset( $configs [ 'username' ]); $this ->configs = $configs ; } /* * 设置是否持久化,默认为True */ public function setDurable( $durable ) { $this ->durable = $durable ; } /* * 设置是否自动删除 */ public function setAutoDelete( $autodelete ) { $this ->autodelete = $autodelete ; } /* * 设置是否镜像 */ public function setMirror( $mirror ) { $this ->mirror = $mirror ; } /* * 打开amqp连接 */ private function open() { if (! $this ->_conn) { try { $this ->_conn = new AMQPConnection( $this ->configs); $this ->_conn->connect(); $this ->initConnection(); } catch (AMQPConnectionException $ex ) { throw new Exception( 'cannot connection rabbitmq' ,500); } } } /* * rabbitmq连接不变 * 重置交换机,队列,路由等配置 */ public function reset( $exchange_name , $queue_name , $route_key ) { $this ->exchange_name = $exchange_name ; $this ->queue_name = $queue_name ; $this ->route_key = $route_key ; $this ->initConnection(); } /* * 初始化rabbit连接的相关配置 */ private function initConnection() { if ( empty ( $this ->exchange_name) || empty ( $this ->queue_name) || empty ( $this ->route_key)) { throw new Exception( 'rabbitmq exchange_name or queue_name or route_key is empty' ,500); } $this ->_channel = new AMQPChannel( $this ->_conn); $this ->_exchange = new AMQPExchange( $this ->_channel); $this ->_exchange->setName( $this ->exchange_name); $this ->_exchange->setType(AMQP_EX_TYPE_DIRECT); if ( $this ->durable) $this ->_exchange->setFlags(AMQP_DURABLE); if ( $this ->autodelete) $this ->_exchange->setFlags(AMQP_AUTODELETE); $this ->_exchange-> declare (); $this ->_queue = new AMQPQueue( $this ->_channel); $this ->_queue->setName( $this ->queue_name); if ( $this ->durable) $this ->_queue->setFlags(AMQP_DURABLE); if ( $this ->autodelete) $this ->_queue->setFlags(AMQP_AUTODELETE); if ( $this ->mirror) $this ->_queue->setArgument( 'x-ha-policy' , 'all' ); $this ->_queue-> declare (); $this ->_queue->bind( $this ->exchange_name, $this ->route_key); } public function close() { if ( $this ->_conn) { $this ->_conn->disconnect(); } } public function __sleep() { $this ->close(); return array_keys (get_object_vars( $this )); } public function __destruct() { $this ->close(); } /* * 生产者发送消息 */ public function send( $msg ) { $this ->open(); if ( is_array ( $msg )){ $msg = json_encode( $msg ); } else { $msg = trim( strval ( $msg )); } return $this ->_exchange->publish( $msg , $this ->route_key); } /* * 消费者 * $fun_name = array($classobj,$function) or function name string * $autoack 是否自动应答 * * function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //处理消息 $queue->ack($envelope->getDeliveryTag());//手动应答 } */ public function run( $fun_name , $autoack = True){ $this ->open(); if (! $fun_name || ! $this ->_queue) return False; while (True){ if ( $autoack ) $this ->_queue->consume( $fun_name , AMQP_AUTOACK); else $this ->_queue->consume( $fun_name ); } } } |
send.php生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 | <?php set_time_limit(0); include_once ( 'RabbitMQCommand.php' ); $configs = array ( 'host' => '127.0.0.1' , 'port' =>5672, 'username' => 'asdf' , 'password' => '123456' , 'vhost' => '/' ); $exchange_name = 'class-e-1' ; $queue_name = 'class-q-1' ; $route_key = 'class-r-1' ; $ra = new RabbitMQCommand( $configs , $exchange_name , $queue_name , $route_key ); for ( $i =0; $i <=100; $i ++){ $ra ->send( date ( 'Y-m-d H:i:s' ,time())); } exit (); |
accept.php消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | <?php error_reporting (0); include_once ( 'RabbitMQCommand.php' ); $configs = array ( 'host' => '127.0.0.1' , 'port' =>5672, 'username' => 'asdf' , 'password' => '123456' , 'vhost' => '/' ); $exchange_name = 'class-e-1' ; $queue_name = 'class-q-1' ; $route_key = 'class-r-1' ; $ra = new RabbitMQCommand( $configs , $exchange_name , $queue_name , $route_key ); class A{ function processMessage( $envelope , $queue ) { $msg = $envelope ->getBody(); $envelopeID = $envelope ->getDeliveryTag(); $pid = posix_getpid(); file_put_contents ( "log{$pid}.log" , $msg . '|' . $envelopeID . '' . "\r\n" ,FILE_APPEND); $queue ->ack( $envelopeID ); } } $a = new A(); $s = $ra ->run( array ( $a , 'processMessage' ),false); |