摘要:RabbitMQ能为你做些什么? 消息系统允许软件应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用, 或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的 异步和解偶. 或许你正在考虑进行数据投递,非阻塞操作或推送通知。或许你想要实现发布/订阅,异步 处理,或者工作队列。所有这些都属于消息系统的模式。 RabbitMQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发....
RabbitMQ能为你做些什么?
消息系统允许软件应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,
或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的
异步和解偶.
或许你正在考虑进行数据投递,非阻塞操作或推送通知。或许你想要实现发布/订阅,异步
处理,或者工作队列。所有这些都属于消息系统的模式。
RabbitMQ是一个消息代理-一个消息系统的媒介。它可以为你的应用提供一个通用的消息发
送和接收平台,并且保证消息再传输过程中的安全。
地址
建议查看官方文档地址,以下
该文档,含有各种例子,可自己尝试,小编已成功运行
运行环境
系统:deepin
服务:rabbitmq-serverv3.6
1、使用composer安装php-amqplib
在你的项目中添加一个composer.json文件: { "require":{ "php-amqplib/php-amqplib":"2.6.*" } } 只要你已经安装Composer功能,你可以运行以下: $composerinstall 已经存在的项目则执行 $composerupdate 这时在verdor目录就已经下载完毕 具体可以参考官方文档:https://github.com/php-amqplib/php-amqplib
2、使用php-amqplib
英文文档:http://www.rabbitmq.com/getstarted.html 中文文档:https://rabbitmq.shujuwajue.com/tutorials_with_php/[2]Work_Queues.md.html 总结:官方文档看起来有点乱,总结了一下,基本都是互通的,以下为简单基本步骤,仅供参考,NOBB。 生产者: 1、创建连接 主要参数说明: $host:RabbitMQ服务器主机IP地址 $port:RabbitMQ服务器端口 $user:连接RabbitMQ服务器的用户名 $password:连接RabbitMQ服务器的用户密码 $vhost:连接RabbitMQ服务器的vhost(服务器可以有多个vhost,虚拟主机,类似nginx的vhost) $connection=newAMQPStreamConnection($host,$port,$user,$password,$vhost); 2、获取信道 //$channel_id信道id,不传则获取$channel[“”]信道,再无则循环$this->channle数组,下标从1到最大信道数找第一个不是AMQPChannel对象的下标,实例化并返回AMQPChannel对象,无则抛出异常Nofreechannelids $channel=$connection->channel($channel_id); 3、在信道里创建交换器 #$exhcange_name交换器名字 #$type交换器类型: ’’默认交换机匿名交换器未显示声明类型都是该类型 fanout扇形交换器会发送消息到它所知道的所有队列,每个消费者获取的消息都是一致的 headers头部交换器 direct直连交换器,该交换机将会对绑定键(bindingkey)和路由键(routingkey)进行精确匹配 topic话题交换器该交换机会对路由键正则匹配,必须是*(一个单词)、#(多个单词,以.分割)、user.key.abc.*类型的key rpc #$passivefalse #durablefalse #auto_detletefalse $channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete); //常用设置$passive=>false$durable=>false$auto_delete->false 4、创建要发送的信息,可以创建多个消息 #$datastring类型要发送的消息 #$propertiesarray类型设置的属性,比如设置该消息持久化[‘delivery_mode’=>2] $msg=newAMQPMessage($data,$properties) 5、发送消息 #$msgobjectAMQPMessage对象 #$exchangestring交换机名字 #$routing_keystring路由键如果交换机类型 fanout:该值会被忽略,因为该类型的交换机会把所有它知道的队列发消息,无差别区别 direct只有精确匹配该路由键的队列,才会发送消息到该队列 topic只有正则匹配到的路由键的队列,才会发送到该队列 $channel->basic_publish($msg,$exchange,$routing_key); 6、关闭信道和链接 $channel->close(); $connection->close(); 消费者: 1、创建连接 主要参数说明: $host:RabbitMQ服务器主机IP地址 $port:RabbitMQ服务器端口 $user:连接RabbitMQ服务器的用户名 $password:连接RabbitMQ服务器的用户密码 $vhost:连接RabbitMQ服务器的vhost(服务器可以有多个vhost,虚拟主机,类似nginx的vhost) $connection=newAMQPStreamConnection($host,$port,$user,$password,$vhost); 2、获取信道 //$channel_id信道id,不传则获取$channel[“”]信道,再无则循环$this->channle数组,下标从1到最大信道数找第一个不是AMQPChannel对象的下标,实例化并返回AMQPChannel对象,无则抛出异常Nofreechannelids $channel=$connection->channel($channel_id); 3、在信道里创建交换器,未显式声明交换机都是使用匿名交换机 #$exhcange_name交换器名字 #$type交换器类型: ’’默认交换机匿名交换器未显示声明类型都是该类型 fanout扇形交换器会发送消息到它所知道的所有队列,每个消费者获取的消息都是一致的 headers头部交换器 direct直连交换器,该交换机将会对绑定键(bindingkey)和路由键(routingkey)进行精确匹配 topic话题交换器该交换机会对路由键正则匹配,必须是*(一个单词)、#(多个单词,以.分割)、user.key.abc.*类型的key rpc #$passivefalse #durablefalse #auto_detletefalse $channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete); //常用设置$passive=>false$durable=>false$auto_delete->false 4、声明消费者队列 (1)非持久化队列,RabbitMQ退出或者崩溃时,该队列就不存在 list($queue_name,,)=$channel->queue_declare("",false,false,false,false) (2)持久化队列(需要显示声明,第三个参数要设置为true),保存到磁盘,但不一定完全保证不丢失信息,因为保存总是要有时间的。 list($queue_name,,)=$channel->queue_declare("ex_queue",false,false,true,false) 5、绑定交换机,当未显示绑定交换机时,默认是绑定匿名交换机 #绑定:交换机与队列的关系,如下面这句代码意思是,$queue_name队列对logs交换机数据感兴趣,该队列就消费该交换机传过来的数据:这个队列(queue)对这个交换机(exchange)的消息感兴趣.$binding_key默认为空,表示对该交换机所有消息感兴趣,如果值不为空,则该队列只对该类型的消息感兴趣(除了fanout交换机以外) $channel->queue_bind($queue_name,\'logs\',$binding_key); 6、消费消息 #该代码表示使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker),轮询、负载均衡配置 #$channel->basic_qos(null,1,null); #第四个参数no_ack=false时,表示进行ack应答,确保消息已经处理 #$callback表示回调函数,传入消息参数 $channel->basic_consume(\'ex_queue\',\'\',false,false,false,false,$callback); $callback=function($msg){ echo"[x]Received",$msg->body,"n"; sleep(substr_count($msg->body,\'.\')); echo"[x]Done","n"; #当no_ack=false时,需要写下行代码,否则可能出现内存不足情况#$msg->delivery_info[\'channel\']->basic_ack($msg->delivery_info[\'delivery_tag\']); }; #监听消息,一有消息,立马就处理 while(count($channel->callbacks)){ $channel->wait(); }
相关文章推荐
虚拟主机的专业参数,分别都是什么意思?2022-09-09
中非域名注册规则是怎样的?注册域名有什么用处? 2022-01-10
HostEase新年活动促销 美国/香港主机全场低至五折2021-12-28
HostGator下载完整备份教程分享2021-12-28
Flink中有界数据与无界数据的示例分析2021-12-28