摘要:摘要: RabbitMQ学习之spring-amqp的重要类的认识
对于大多数应用来说都做了与spring整合,对于rabbitmq来说。也有与spring的整合。可能通过spring的官网找到spring-amqp项目下载。spring-amqp项目包括三个子项目:spring-amqp、spring-erlang、spring-rabbit.
下面来认识一下spring-amqp中的几个重要类;以spring-amqp-1.0.0.M3版本为例
1、Message: Spring AMQP定义的Message类是AMQP域模型中代表之一。Message类封装了body(消息BODY)和properties(消息属性) 。使得这个API看起来很简单。Message类定义如下:
[java]view plaincopy
print?
publicclassMessage{ privatefinalMessagePropertiesmessageProperties; privatefinalbyte[]body; publicMessage(byte[]body,MessagePropertiesmessageProperties){ this.body=body; this.messageProperties=messageProperties; } publicbyte[]getBody(){ returnthis.body; } publicMessagePropertiesgetMessageProperties(){ returnthis.messageProperties; } }
其中MessageProperties类中定义了例如messageId、timestamp、contentType等属性。这此属性可以扩展到用户通过setHeader(String key, Object value)方法来自定义“headers”。
2、Exchange
Exchange接口代表一个AMQP的Exchange,决定消息生产者发送消息。每个Exchange都包括一个特定的唯一名字的虚拟主机的代理和一些其他属性。
[java]view plaincopy
print?
publicinterfaceExchange{ StringgetName(); StringgetType(); booleanisDurable(); booleanisAutoDelete(); Map<String,Object>getArguments(); }其中 AbstractExchange类实现了Exchange类。而DirectExchange、TopicExchange、FanoutExchang、HeadersExchange四个类继承AbstractExchange。并重写了getType()类。根据各自相对应的Exchange类型。DirectExchange、TopicExchange、FanoutExchang、HeadersExchange分别对应的类型为direct,topic,fanout,headers.
3、Queue
Queue类是消息消费者接收消息中重要的一个组成部分。通过与Exchange判定来肯定消费者所接收的消息。伪代码如下:
[java]view plaincopy
print?
publicclassQueue{ privatefinalStringname; privatevolatilebooleandurable; privatevolatilebooleanexclusive; privatevolatilebooleanautoDelete; privatevolatileMap<String,Object>arguments; publicQueue(Stringname){ this.name=name; }其中name表示队列的名称、durable表示持久性。true表示是。exclusive表示独占性。由于在AmqpTemplate中提供一个方法来得到唯一的队列。这个队列可能是一个”reply-to“地址或者其他信息,因此一般exclusive和autoDelete一般设定为true.
4、Binding
Bingding类通过多种构造参数来判定Exchange,Queue,routingkey;例如
[java]view plaincopy
print?
Binding(Queuequeue,FanoutExchangeexchange) Binding(Queuequeue,HeadersExchangeexchange,Map<String,Object>arguments) Binding(Queuequeue,DirectExchangeexchange) Binding(Queuequeue,DirectExchangeexchange,StringroutingKey) Binding(Queuequeue,TopicExchangeexchange,StringroutingKey)5、AmqpTemplate
AmqpTemplate是用来发送消息的模板类
[java]view plaincopy
print?
/** *SpecifiesabasicsetofAMQPoperations. * *Providessynchronoussendandreceivemethods.The{@link#convertAndSend(Object)}and{@link#receiveAndConvert()} *methodsallowletyousendandreceivePOJOobjects.Implementationsareexpectedtodelegatetoaninstanceof *{@linkMessageConverter}toperformconversiontoandfromAMQPbyte[]payloadtype. * *@authorMarkPollack *@authorMarkFisher */ publicinterfaceAmqpTemplate{ //sendmethodsformessages /** *Sendamessagetoadefaultexchangewithadefaultroutingkey. * *@parammessageamessagetosend *@throwsAmqpExceptionifthereisaproblem */ voidsend(Messagemessage)throwsAmqpException; /** *Sendamessagetoadefaultexchangewithaspecificroutingkey. * *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@throwsAmqpExceptionifthereisaproblem */ voidsend(StringroutingKey,Messagemessage)throwsAmqpException; /** *Sendamessagetoaspecificexchangewithaspecificroutingkey. * *@paramexchangethenameoftheexchange *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@throwsAmqpExceptionifthereisaproblem */ voidsend(Stringexchange,StringroutingKey,Messagemessage)throwsAmqpException; //sendmethodswithconversion /** *ConvertaJavaobjecttoanAmqp{@linkMessage}andsendittoadefaultexchangewithadefaultroutingkey. * *@parammessageamessagetosend *@throwsAmqpExceptionifthereisaproblem */ voidconvertAndSend(Objectmessage)throwsAmqpException; /** *ConvertaJavaobjecttoanAmqp{@linkMessage}andsendittoadefaultexchangewithaspecificroutingkey. * *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@throwsAmqpExceptionifthereisaproblem */ voidconvertAndSend(StringroutingKey,Objectmessage)throwsAmqpException; /** *ConvertaJavaobjecttoanAmqp{@linkMessage}andsendittoaspecificexchangewithaspecificroutingkey. * *@paramexchangethenameoftheexchange *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@throwsAmqpExceptionifthereisaproblem */ voidconvertAndSend(Stringexchange,StringroutingKey,Objectmessage)throwsAmqpException; /** *ConvertaJavaobjecttoanAmqp{@linkMessage}andsendittoadefaultexchangewithadefaultroutingkey. * *@parammessageamessagetosend *@parammessagePostProcessoraprocessortoapplytothemessagebeforeitissent *@throwsAmqpExceptionifthereisaproblem */ voidconvertAndSend(Objectmessage,MessagePostProcessormessagePostProcessor)throwsAmqpException; /** *ConvertaJavaobjecttoanAmqp{@linkMessage}andsendittoadefaultexchangewithaspecificroutingkey. * *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@parammessagePostProcessoraprocessortoapplytothemessagebeforeitissent *@throwsAmqpExceptionifthereisaproblem */ voidconvertAndSend(StringroutingKey,Objectmessage,MessagePostProcessormessagePostProcessor) throwsAmqpException; /** *ConvertaJavaobjecttoanAmqp{@linkMessage}andsendittoaspecificexchangewithaspecificroutingkey. * *@paramexchangethenameoftheexchange *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@parammessagePostProcessoraprocessortoapplytothemessagebeforeitissent *@throwsAmqpExceptionifthereisaproblem */ voidconvertAndSend(Stringexchange,StringroutingKey,Objectmessage,MessagePostProcessormessagePostProcessor) throwsAmqpException; //receivemethodsformessages /** *Receiveamessageifthereisonefromadefaultqueue.Returnsimmediately,possiblywithanullvalue. * *@returnamessageornullifthereisnonewaiting *@throwsAmqpExceptionifthereisaproblem */ Messagereceive()throwsAmqpException; /** *Receiveamessageifthereisonefromaspecificqueue.Returnsimmediately,possiblywithanullvalue. * *@paramqueueNamethenameofthequeuetopoll *@returnamessageornullifthereisnonewaiting *@throwsAmqpExceptionifthereisaproblem */ Messagereceive(StringqueueName)throwsAmqpException; //receivemethodswithconversion /** *ReceiveamessageifthereisonefromadefaultqueueandconvertittoaJavaobject.Returnsimmediately, *possiblywithanullvalue. * *@returnamessageornullifthereisnonewaiting *@throwsAmqpExceptionifthereisaproblem */ ObjectreceiveAndConvert()throwsAmqpException; /** *ReceiveamessageifthereisonefromaspecificqueueandconvertittoaJavaobject.Returnsimmediately, *possiblywithanullvalue. * *@paramqueueNamethenameofthequeuetopoll *@returnamessageornullifthereisnonewaiting *@throwsAmqpExceptionifthereisaproblem */ ObjectreceiveAndConvert(StringqueueName)throwsAmqpException; //sendandreceivemethodsformessages /** *BasicRPCpattern.Sendamessagetoadefaultexchangewithadefaultroutingkeyandattempttoreceivea *response.Implementationswillnormallysetthereply-toheadertoanexclusivequeueandwaitupforsometime *limitedbyatimeout. * *@parammessageamessagetosend *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ MessagesendAndReceive(Messagemessage)throwsAmqpException; /** *BasicRPCpattern.Sendamessagetoadefaultexchangewithaspecificroutingkeyandattempttoreceivea *response.Implementationswillnormallysetthereply-toheadertoanexclusivequeueandwaitupforsometime *limitedbyatimeout. * *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ MessagesendAndReceive(StringroutingKey,Messagemessage)throwsAmqpException; /** *BasicRPCpattern.Sendamessagetoaspecificexchangewithaspecificroutingkeyandattempttoreceivea *response.Implementationswillnormallysetthereply-toheadertoanexclusivequeueandwaitupforsometime *limitedbyatimeout. * *@paramexchangethenameoftheexchange *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ MessagesendAndReceive(Stringexchange,StringroutingKey,Messagemessage)throwsAmqpException; //sendandreceivemethodswithconversion /** *BasicRPCpatternwithconversion.SendaJavaobjectconvertedtoamessagetoadefaultexchangewithadefault *routingkeyandattempttoreceivearesponse,convertingthattoaJavaobject.Implementationswillnormally *setthereply-toheadertoanexclusivequeueandwaitupforsometimelimitedbyatimeout. * *@parammessageamessagetosend *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ ObjectconvertSendAndReceive(Objectmessage)throwsAmqpException; /** *BasicRPCpatternwithconversion.SendaJavaobjectconvertedtoamessagetoadefaultexchangewitha *specificroutingkeyandattempttoreceivearesponse,convertingthattoaJavaobject.Implementationswill *normallysetthereply-toheadertoanexclusivequeueandwaitupforsometimelimitedbyatimeout. * *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ ObjectconvertSendAndReceive(StringroutingKey,Objectmessage)throwsAmqpException; /** *BasicRPCpatternwithconversion.SendaJavaobjectconvertedtoamessagetoaspecificexchangewitha *specificroutingkeyandattempttoreceivearesponse,convertingthattoaJavaobject.Implementationswill *normallysetthereply-toheadertoanexclusivequeueandwaitupforsometimelimitedbyatimeout. * *@paramexchangethenameoftheexchange *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ ObjectconvertSendAndReceive(Stringexchange,StringroutingKey,Objectmessage)throwsAmqpException; /** *BasicRPCpatternwithconversion.SendaJavaobjectconvertedtoamessagetoadefaultexchangewithadefault *routingkeyandattempttoreceivearesponse,convertingthattoaJavaobject.Implementationswillnormally *setthereply-toheadertoanexclusivequeueandwaitupforsometimelimitedbyatimeout. * *@parammessageamessagetosend *@parammessagePostProcessoraprocessortoapplytothemessagebeforeitissent *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ ObjectconvertSendAndReceive(Objectmessage,MessagePostProcessormessagePostProcessor)throwsAmqpException; /** *BasicRPCpatternwithconversion.SendaJavaobjectconvertedtoamessagetoadefaultexchangewitha *specificroutingkeyandattempttoreceivearesponse,convertingthattoaJavaobject.Implementationswill *normallysetthereply-toheadertoanexclusivequeueandwaitupforsometimelimitedbyatimeout. * *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@parammessagePostProcessoraprocessortoapplytothemessagebeforeitissent *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ ObjectconvertSendAndReceive(StringroutingKey,Objectmessage,MessagePostProcessormessagePostProcessor)throwsAmqpException; /** *BasicRPCpatternwithconversion.SendaJavaobjectconvertedtoamessagetoaspecificexchangewitha *specificroutingkeyandattempttoreceivearesponse,convertingthattoaJavaobject.Implementationswill *normallysetthereply-toheadertoanexclusivequeueandwaitupforsometimelimitedbyatimeout. * *@paramexchangethenameoftheexchange *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@parammessagePostProcessoraprocessortoapplytothemessagebeforeitissent *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ ObjectconvertSendAndReceive(Stringexchange,StringroutingKey,Objectmessage,MessagePostProcessormessagePostProcessor)throwsAmqpException; }
6、AmqpAdmin和RabbitAdmin
用户配置Queue、Exchange、Binding的代理类。代理类会自动声明或创建这些配置信息。
下面这个类用于异步接收消息的处理类
相关文章推荐
虚拟主机的专业参数,分别都是什么意思?2022-09-09
中非域名注册规则是怎样的?注册域名有什么用处? 2022-01-10
HostEase新年活动促销 美国/香港主机全场低至五折2021-12-28
HostGator下载完整备份教程分享2021-12-28
Flink中有界数据与无界数据的示例分析2021-12-28