# 02-消息队列

# 一、延时队列Demo

# 1、队列声明配置


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author qixiaodong
 * @Date 2020/11/24 10:03
 */

@Configuration
public class StockDelayQueueConfig {

    /**
     *  声明延时队列交换器
     */
    @Bean("stockDelayExchange")
    public TopicExchange stockDelayExchange() {
        return ExchangeBuilder.topicExchange("demo-stock-delay-exchange").durable(true).build();
    }

    /**
     *  声明延时队列
     */
    @Bean("stockDelayQueue")
    public Queue stockDelayQueue() {
        return QueueBuilder.durable("demo-stock-delay-queue")
                //如果消息过时,则会被投递到当前对应到死信交互机 stock-dlx-exchange
                .ttl(10000)
                .deadLetterExchange("demo-stock-dlx-exchange")
                .build();
    }

    /**
     *  声明延时队列-交换器绑定,路由规则:stock.compensate.routing.*
     */
    @Bean("inDelayBinding")
    public Binding inDelayBinding(@Qualifier("stockDelayExchange") TopicExchange exchange,
                                  @Qualifier("stockDelayQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("stock.compensate.routing.*");
    }

    /**
     *  声明死信交换器
     */
    @Bean("stockDlxExchange")
    public TopicExchange stockDlxExchange() {
        return ExchangeBuilder.topicExchange("demo-stock-dlx-exchange").build();
    }

    /**
     *  声明入库死信队列
     */
    @Bean("inStockDlxQueue")
    public Queue inStockDlxQueue() {
        return QueueBuilder.durable("demo-in-stock-dlx-queue").build();
    }

    /**
     *  声明入库死信队列-死信交换器绑定器,路由规则:stock.compensate.routing.in
     */
    @Bean("inStockDlxBinding")
    public Binding inStockDlxBinding(@Qualifier("stockDlxExchange") TopicExchange exchange,
                                     @Qualifier("inStockDlxQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("stock.compensate.routing.in");
    }

    /**
     *  声明出库死信队列
     */
    @Bean("outStockDlxQueue")
    public Queue outStockDlxQueue() {
        return QueueBuilder.durable("demo-out-stock-dlx-queue").build();
    }

    /**
     *  声明出库死信队列-死信交换器绑定器,路由规则:stock.compensate.routing.out
     */
    @Bean("outStockDlxBinding")
    public Binding outStockDlxBinding(@Qualifier("stockDlxExchange") TopicExchange exchange,
                                      @Qualifier("outStockDlxQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("stock.compensate.routing.out");
    }
}

# 2、监听Demo


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author qixiaodong
 * @Date 2020/11/24 11:28
 */

@Slf4j
@Component
public class StockDelayQueueListener {

    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    /**
     * 监听入库-死信消息队列
     */
    @RabbitListener(queues = "demo-in-stock-dlx-queue", ackMode = "AUTO", group = "stock-compensate")
    @RabbitHandler
    public void inStock(AfterSaleOrderDTO dto) {
        log.info("入库消息:{}---时间:{}", JSON.toJSONString(dto), dateFormat.format(new Date()));
    }


    /**
     * 监听出库-死信消息队列
     */
    @RabbitListener(queues = "demo-out-stock-dlx-queue", ackMode = "AUTO", group = "stock-compensate")
    @RabbitHandler
    public void outStock(ConsumeDTO consumeDTO) {
        log.info("出库消息:{}---时间:{}", JSON.toJSONString(consumeDTO), dateFormat.format(new Date()));
    }
}

# 3、发消息Demo


import com.alibaba.fastjson.JSON;
import com.feihe.businesscenter.store.model.dto.AfterSaleOrderDTO;
import com.feihe.businesscenter.store.model.dto.order.ConsumeDTO;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * <p>
 * 前端控制器
 * </p>
 *
 * @author jiangjingping
 * @since 2020-09-01
 */

@Slf4j
@RestController
@Api(tags = {"demo"})
@RequestMapping("/rabbit")
public class RabbitMqDemo {

    @Resource
    private RabbitTemplate rabbitTemplate;

    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    @ApiOperation(value = "向延时队列发送消息")
    @GetMapping(value = "send", name = "send message")
    public String send() {
        AfterSaleOrderDTO dto = new AfterSaleOrderDTO();
        dto.setOrderNo("orderId");
        rabbitTemplate.send("demo-stock-delay-exchange", "stock.compensate.routing.in",
                MessageBuilder.withBody(JSON.toJSONBytes(dto)).build());

        ConsumeDTO consumeDTO = new ConsumeDTO();
        consumeDTO.setStoreId("storeId");
        rabbitTemplate.send("demo-stock-delay-exchange", "stock.compensate.routing.out",
                MessageBuilder.withBody(JSON.toJSONBytes(consumeDTO)).build());

        log.info("{} 发送消息", dateFormat.format(new Date()));
        return "发送成功";
    }
}
Last Updated: 12/15/2023, 8:18:50 AM