RabbitMQ
# 工作模式
- P:生产者
- C:消费者
- Queue:消息队列
- Exchange:交换机
- Fanout:广播
- Direct:定向,把消息交给指定routing key的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
// Fanout: routingKey = ""
// Direct: routingKey = "str"
// Topic: routingKey = "str.#"
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,
@Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.*bind*(queue).to(exchange).with(routingKey).noargs();
}
# 消息可靠性
- confirm 确认模式
- return 退回模式
消息从producer到exchange,会返回一个confirmCallback。
消息从exchange到Queue投递失败,会返回一个returnCallback。
# 生产者
@Resource
private RabbitTemplate rabbitTemplate;
@Test
void testProducer() throws InterruptedException {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm 执行了");
if (ack){
System.out.println(correlationData + "消息投递成功" + cause);
}
else{
System.out.println(correlationData + "消息投递失败" + cause);
}
}
});
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息路由到消息队列失败"+returned);
}
});
rabbitTemplate.convertAndSend( "boot_topic_exchange","boot1111.hahah", "boot mq hello!!!", new CorrelationData("1"));
Thread.sleep(1000);
}
# 持久化
@Configuration
public class RabbitConfig {
private static final String EXCHANGE_NAME = "boot_topic_exchange";
private static final String QUEUE_NAME = "boot_queue";
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,
@Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
# 消费者
@Component
public class ConsumerListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message, Channel channel) throws IOException, InterruptedException {
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(new String(message.getBody()));
int a = 1/0;
channel.basicAck(deliveryTag, true);
System.out.println("message confirm.");
}catch (Exception e){
channel.basicNack(deliveryTag, true, true);
}
}
}
# 消费端限流
配置prefetch
属性
# TTL
Time To Live
expiration
:设置消息到达存活时间后,还没有被消费,会自动被清除。
x-message-ttl
:对队列设置TTL。
# 死信队列
消息成为死信的三种情况:
- 队列消息长度到达限制
- 消费者拒收消息,并且不把原消息放到原消息,
requeue = false
- 消息到了过期时间
死信交换机和普通的交换机没区别,同样也可以路由到绑定的队列中。
# 延迟队列
消息进入队列后不会立即被消费
# 消息可靠性保障-消息补偿
# 消息幂等性保障
乐观锁,数据库中加一个字段,version