重点
- RabbitAdmin
- Spring AMQP 声明
- RabbitTemplate
- SimpleMessageListenerContainer
- MessageListenerAdaper
- MessageConverter
使用RabbitMQ结合Spring AMQP
RabbitAdmin
- 用于声明RabbitMQ相关配置、操作RabbitMQ
- autoStartup设为true:表示Spring容器启动时自动初始化RabbitAdmin
- 底层实现:从Spring容器中获取Exchange、Binding、RoutingKey以及Queue的@Bean声明
- rabbitTemplate的execute方法执行对应的声明等操作
SpringAMQP声明
- exchange
- TopicExchange
- FanoutExchange
- DirectExchange
- queue
- binding
- BindingBuilder
spring使用@bean声明exchange queue binding 例子如下:
@Bean public TopicExchange exchange001() { return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //队列持久 } @Bean public Binding binding001() { return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } 复制代码
RabbitTemplate
- 与SpringAMQP整合的时候进行发送消息的关键类
- 提供丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口
- ConfirmCallback
- ReturnCallback
- 与Spring整合时需要实例化,但是与Springboot整合时不需要,在配置文件添加配置即可
以下是RabbitTemplate实例化的例子:
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(//); rabbitTemplate.setReturnCallback(//); return rabbitTemplate; }复制代码
使用RabbitTemplate sendMessage
- rabbitTemplate.convertAndSend方法是主要的发送消息的方法
- MessageProperties 用于构造消息体
- MessagePostProcessor:消息发送之后对消息进行的设置
例子:
@Test public void testSendMessage() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "信息描述.."); messageProperties.getHeaders().put("type", "自定义消息类型.."); Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { System.err.println("------添加额外的设置---------"); message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr", "额外新加的属性"); return message; } }); }复制代码
SimpleMessageListenerContainer
简单消息容器 功能:
- 监听多个队列、自动启动、自动声明
- 设置事务特性、事务管理器、事务属性、事务容器、是否开启事务、回滚消息
- 设置消费者数量、最小最大数量、批量消费
- 设置消息确认和自动确认模式、是否重回队列、异常捕获函数
- 设置消费者标签生成策略、是否独占模式、消费者属性等
- 设置具体的监听器、消息转换器 message convert
- SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者的大小、接收消息的模式等,可以基于此开发rabbitmq自定义后台管控平台
属性
- queues
- concurrentConsumers:当前消费者数
- maxConcurrentConsumers:最大消费者并发
- defaultRequeueRejected: 是否重回队列 false
- acknowledgeMode:消息确认模型 AUTO
- exposeListenerChannel:
- messageListener: 消息监听
- consumerTagStrategy:consumerTag生成策略
@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(5); container.setDefaultRequeueRejected(false); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setExposeListenerChannel(true); container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); }复制代码
MessageListenerApapter 适配器模式
- extends AbstractAdaptableMessageListener 消息listener
- queueOrTagToMethodName 队列标识与方法名称组成的集合
- defaultListenerMethod 默认监听方法名称
- Delegate 委托对象:实际真实的委托对象
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate);//设定queue使用哪个adapter方法处理MapqueueOrTagToMethodName = new HashMap<>();queueOrTagToMethodName.put("queue001","method1");queueOrTagToMethodName.put("queue002","method2");adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);//设置默认处理方法,默认处理方法是handleMessageadapter.setDefaultListenerMethod("handleMessage");//设置消息转换方式adapter.setMessageConverter(new TextMessageConvert());//消息监听container.setMessageListener(adapter);复制代码
MessageConverter 消息转换器
自定义常用转换器 都需要实现 MessageConverter接口
Json convert: Jackson2JsonMessageConverter 可以进行java对象的转换功能 DefaultJackson2JavaTypeMapper映射器: 可以进行java对象的映射关系 二进制 convert: 如Image、PDF、PPT、stream
Springboot整合RabbitMQ
基本概念
@EnableRabbit
@EnableRabbit和@Configuration一起使用,可以加在类或者方法上,这个注解开启了容器对注册的bean的@RabbitListener检查。
@RabbitListener
@RabbitListener用于注册Listener时使用的信息:如queue,exchange,key、ListenerContainerFactory和RabbitAdmin的bean name。
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue}", durable = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC), key = "${mq.config.key}"), admin = "rabbitAdmin")复制代码
扫描到bean带有该注解后,首先会将注解的内容封装到Endpoint对象中并和ListenerContainerFactory的实例一起添加到上面的RabbitListenerEndpointRegistry实例中。添加的时候会创建相应的ListenerContainer实例并添加Listener对象。
@RabbitHandler
@RabbitListener 和 @RabbitHandler结合使用,不同类型的消息使用不同的方法来处理。
- 不同的消息类型走不同的handler
public class CommandListener{ @RabbitHandler public void handler1(ApiMessage msg){ System.out.println(msg); } @RabbitHandler public void handler2(Map msg){ System.out.println(msg); }}复制代码
生产端发送消息
- ConfirmCallback
- ReturnCallback
- CorrelationData
- convertAndSend
//回调函数: confirm确认 message ->send —> broker -> ack final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.err.println("correlationData: " + correlationData); System.err.println("ack: " + ack); if(!ack){ System.err.println("异常处理...."); } } }; //回调函数: return返回 final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) { System.err.println("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText); } };//发送消息方法调用: 构建Message消息 public void send(Object message, Mapproperties) throws Exception { MessageHeaders mhs = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一 CorrelationData correlationData = new CorrelationData("1234567890"); rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData); }复制代码
消费端监听方式
- @RabbitListener
- bindings @QueueBindingvalue
- @Queue
- durable 队列是否持久化
- value 值
- exchange @Exchange
- type 交换机的类型
- value 值
- ignoreDeclarationExceptions 是否忽略声明异常
- key routing Key
- @Queue
- bindings @QueueBindingvalue
- @RabbitHandler
- onMessage
- @Payload: message body
- deliveryTag
- basicAck
- Channel:
- @Headers: message headers
- @Payload: message body
/** * *spring.rabbitmq.listener.order.queue.name=queue-2 spring.rabbitmq.listener.order.queue.durable=true spring.rabbitmq.listener.order.exchange.name=exchange-1 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.order.key=springboot.* * @param order * @param channel * @param headers * @throws Exception */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable="${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable="${spring.rabbitmq.listener.order.exchange.durable}", type= "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"), key = "${spring.rabbitmq.listener.order.key}" ) ) @RabbitHandler public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, Channel channel, @Headers Mapheaders) throws Exception { System.err.println("--------------------------------------"); System.err.println("消费端order: " + order.getId()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); //手工ACK channel.basicAck(deliveryTag, false); }复制代码
Spring cloud stream with RabbitMQ
特点: Spring cloud stream 生产端和消费端可以使用不同的中间件
整体架构核心概念图:
基本概念
Barista
Barista接口: Barista接口是定义来作为后面类的参数. 这一接口定义了通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
- @Output 输入注解
- @Input 输出注解
- @StreamListener 监听消息
使用Spring Cloud Stream非常简单,使用好这3个注解即可,在实现高性能消息的生产和消费场景非常适合,但是使用SpringCloudStream框架有一个非常大的问题就是不能实现可靠性的投递,也就是无法保证消息的100%可靠性,会存在少量消息丢失的问题。原因是为了兼容Kafka。