# 02-消息队列
# 一、延时队列Demo
# 1、队列声明配置
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author qixiaodong
* @Date 2020/11/24 10:03
*/
@Configuration
public class StockDelayQueueConfig {
/**
* 声明延时队列交换器
*/
@Bean("stockDelayExchange")
public TopicExchange stockDelayExchange() {
return ExchangeBuilder.topicExchange("demo-stock-delay-exchange").durable(true).build();
}
/**
* 声明延时队列
*/
@Bean("stockDelayQueue")
public Queue stockDelayQueue() {
return QueueBuilder.durable("demo-stock-delay-queue")
//如果消息过时,则会被投递到当前对应到死信交互机 stock-dlx-exchange
.ttl(10000)
.deadLetterExchange("demo-stock-dlx-exchange")
.build();
}
/**
* 声明延时队列-交换器绑定,路由规则:stock.compensate.routing.*
*/
@Bean("inDelayBinding")
public Binding inDelayBinding(@Qualifier("stockDelayExchange") TopicExchange exchange,
@Qualifier("stockDelayQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("stock.compensate.routing.*");
}
/**
* 声明死信交换器
*/
@Bean("stockDlxExchange")
public TopicExchange stockDlxExchange() {
return ExchangeBuilder.topicExchange("demo-stock-dlx-exchange").build();
}
/**
* 声明入库死信队列
*/
@Bean("inStockDlxQueue")
public Queue inStockDlxQueue() {
return QueueBuilder.durable("demo-in-stock-dlx-queue").build();
}
/**
* 声明入库死信队列-死信交换器绑定器,路由规则:stock.compensate.routing.in
*/
@Bean("inStockDlxBinding")
public Binding inStockDlxBinding(@Qualifier("stockDlxExchange") TopicExchange exchange,
@Qualifier("inStockDlxQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("stock.compensate.routing.in");
}
/**
* 声明出库死信队列
*/
@Bean("outStockDlxQueue")
public Queue outStockDlxQueue() {
return QueueBuilder.durable("demo-out-stock-dlx-queue").build();
}
/**
* 声明出库死信队列-死信交换器绑定器,路由规则:stock.compensate.routing.out
*/
@Bean("outStockDlxBinding")
public Binding outStockDlxBinding(@Qualifier("stockDlxExchange") TopicExchange exchange,
@Qualifier("outStockDlxQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("stock.compensate.routing.out");
}
}
# 2、监听Demo
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Author qixiaodong
* @Date 2020/11/24 11:28
*/
@Slf4j
@Component
public class StockDelayQueueListener {
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 监听入库-死信消息队列
*/
@RabbitListener(queues = "demo-in-stock-dlx-queue", ackMode = "AUTO", group = "stock-compensate")
@RabbitHandler
public void inStock(AfterSaleOrderDTO dto) {
log.info("入库消息:{}---时间:{}", JSON.toJSONString(dto), dateFormat.format(new Date()));
}
/**
* 监听出库-死信消息队列
*/
@RabbitListener(queues = "demo-out-stock-dlx-queue", ackMode = "AUTO", group = "stock-compensate")
@RabbitHandler
public void outStock(ConsumeDTO consumeDTO) {
log.info("出库消息:{}---时间:{}", JSON.toJSONString(consumeDTO), dateFormat.format(new Date()));
}
}
# 3、发消息Demo
import com.alibaba.fastjson.JSON;
import com.feihe.businesscenter.store.model.dto.AfterSaleOrderDTO;
import com.feihe.businesscenter.store.model.dto.order.ConsumeDTO;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* <p>
* 前端控制器
* </p>
*
* @author jiangjingping
* @since 2020-09-01
*/
@Slf4j
@RestController
@Api(tags = {"demo"})
@RequestMapping("/rabbit")
public class RabbitMqDemo {
@Resource
private RabbitTemplate rabbitTemplate;
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@ApiOperation(value = "向延时队列发送消息")
@GetMapping(value = "send", name = "send message")
public String send() {
AfterSaleOrderDTO dto = new AfterSaleOrderDTO();
dto.setOrderNo("orderId");
rabbitTemplate.send("demo-stock-delay-exchange", "stock.compensate.routing.in",
MessageBuilder.withBody(JSON.toJSONBytes(dto)).build());
ConsumeDTO consumeDTO = new ConsumeDTO();
consumeDTO.setStoreId("storeId");
rabbitTemplate.send("demo-stock-delay-exchange", "stock.compensate.routing.out",
MessageBuilder.withBody(JSON.toJSONBytes(consumeDTO)).build());
log.info("{} 发送消息", dateFormat.format(new Date()));
return "发送成功";
}
}