Rabbitmq的介绍及使用

  1. AMQP协议
  2. RabbitMQ
  3. spring rabbitmq 启动原理
  4. rabbitmq 使用
    1. 交换机队列绑定
    2. 发送消息
    3. 接受消息

AMQP协议

  AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

RabbitMQ

  rabbitmq就是一个支持amqp协议的消息中间件 ,主要以 交换机(Exchange)、队列(queue)、 消息(Messages)、路由(routing key) 组成. 交换机呢又分为directExchange、 fanoutExchange 、headersExchange、 topicExchange 这几种

  • direct: 分发到完全匹配 路由key的所有队列
  • fanout: 分发所有交换机数据到所有队列
  • headers: 分发消息通过headers属性匹配的到队列
  • topic: 与direct类似 只是路由匹配规则不同

topic key eg:A.B.C 、A*.B 、A#B ……等等类似

spring rabbitmq 启动原理

springboot启动会加载META-INF 下的spring.factories文件对其中的部分类自动注入 , 其中包括RabbitAutoConfiguration ,对rabbitmq的连接工厂 rabbitTemplate amqpAdmin 等 进行bean注入

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
...
org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration,\
...

rabbitmq 使用

  通过RabbitProperties类可以查看spring提供了哪些配置给我们使用,例如下面几个。

spring:
  rabbitmq:
    host: 192.168.118.134
    port: 5672
    username: admin
    password: 1
    virtualHost: /HOST

交换机队列绑定

我们知道rabbitmq自动注入时候会自动注入个amqpAdmin()类 该类同时实现了InitializingBean 所以, bean初始化时候会调用afterPropertiesSet()方法 。

public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,
		BeanNameAware, InitializingBean 

查看该方法实现关键点 会在这里向连接工程添加添加监听器,在连接rabbitmq服务时候会调用initialize()

final AtomicBoolean initializing = new AtomicBoolean(false);

			this.connectionFactory.addConnectionListener(connection -> {

				if (!initializing.compareAndSet(false, true)) {
					// If we are already initializing, we don't need to do it again...
					return;
				}
				try {
					
					if (this.retryTemplate != null) {
						this.retryTemplate.execute(c -> {
							initialize();
							return null;
						});
					}
					else {
						initialize();
					}
				}
				finally {
					initializing.compareAndSet(true, false);
				}

			});

继续查看 initialize()方法 该方法会从spring bean工厂中获取Exchange Queue Binding 这三个类型各自的集合 并将其在rabbitmq服务上创建 绑定 。

public void initialize() {

		......
		Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
				this.applicationContext.getBeansOfType(Exchange.class).values());
		Collection<Queue> contextQueues = new LinkedList<Queue>(
				this.applicationContext.getBeansOfType(Queue.class).values());
		Collection<Binding> contextBindings = new LinkedList<Binding>(
				this.applicationContext.getBeansOfType(Binding.class).values());

		processLegacyCollections(contextExchanges, contextQueues, contextBindings);
		processDeclarables(contextExchanges, contextQueues, contextBindings);
		 .......
		this.rabbitTemplate.execute(channel -> {
			declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
			declareQueues(channel, queues.toArray(new Queue[queues.size()]));
			declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
			return null;
		});
	}

这样我们就可以同过@bean的方式 创建交换机 对列 及绑定关系


@Configuration
public class Rabbitconfig {
    //交换机
    @Bean
    public Exchange topicExchange()
    {
        return  ExchangeBuilder.topicExchange("Mytopic").build();
    }
    //队列 
    @Bean
    public Queue myQueue(){
    return   QueueBuilder.durable("MyQueue").build();
    }
    //绑定关系
    @Bean
    public Binding myBinding() {
        return BindingBuilder.bind(myQueue()).to(topicExchange()).with("").noargs();
    }
}

发送消息

   发送消息一般使用rabbitTemplate来发送,通过指定交换机,路由key 以及一个消息类实例org.springframework.amqp.core.Message 其中Message实际是一个被封装起来的byte[] body 字节数组。

@Autowired
    RabbitTemplate rabbitTemplate;

/**交换机为Mytopic 路由key为空 发现消息*/
public String testsend(){
        rabbitTemplate.send("Mytopic","",new Message(("helloworld"+UUID.randomUUID()).getBytes(),new MessageProperties()));
        return "ok";
    }

接受消息

  接受消息spring提供了@RabbitListener注解 ,该注解可在类和方法上使用, 在方法上使用时 调用该方法处理消息 ,在类上使用是 配合@RabbitHandler 根据MessageConverter转换类型选择处理方法。

@RabbitListener(queues={"MyQueue"})
public class RabbitMqClient {

    @RabbitHandler
    public void testreceive(Message message){
       System.out.println("testreceive:"+message.getBody());
    }
     // @Payload  @Headers 注解可以直接获取消息 和头信息
     //头信息 就在 发送时候的MessageProperties中
     //private final Map<String, Object> headers = new HashMap<>();
    /*@RabbitHandler
    public void testreceiveA(@Payload String body, @Headers Map<String,Object> headers){
        System.out.println("testreceiveA:"+body+"-----"+headers);*/
    }
}