RabbitMQ 消息队列:Confirm、Return 机制以及 TTL、死信队列

chan 作者
阅读 7290 喜欢 0

Confirm 机制

放入队列中的消息,如何知道它是否成功发送?我们可以引入 Confirm 机制,只要发送消息到队列中是成功的,队列会给我们返回一个结果

生产者:

/**
 * Work模型 - Confirm 机制生产者
 * @author CHAN
 * @date 2020/03/12
 */
public class Producer {
    private static final String QUEUE_NAME = "confirm-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 开启 confirm 消息确认机制
        channel.confirmSelect();
        // 对消息的可达性进行监听
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long l, boolean b) throws IOException {
                System.out.println("消息发送成功");
            }

            @Override
            public void handleNack(long l, boolean b) throws IOException {
                System.out.println("消息发送失败");
            }
        });
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicPublish("", QUEUE_NAME, null, "发布消息".getBytes());
    }
}

消费者:

/**
 * @author CHAN
 * @date 2020/03/12
 */
public class Consumer {
    private static final String QUEUE_NAME = "confirm-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取消息:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

Return 机制

我们在发送消息的时候,如果指定的交换机或者路由 key 不存在,这时候我们就需要监听这种不可达的消息。前提条件是当前的队列必须有消费者存在

关于参数:

mandatory 如果设置为 true,表示要监听不可达消息并进行处理
如果设置为 false,那么队列端会直接删除这个消息

生产者:

/**
 * @author CHAN
 * @date 2020/03/12
 */
public class Producer {
    private static final String EXCHANGE_NAME = "exchange-return";

    /**
     * 声明能正常路由的 key
     */
    private static final String ROUTING_KEY = "return.server";

    /**
     * 声明不能路由的 key
     */
    private static final String ROUTING_ERROR_KEY = "error.server";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 添加监听
        channel.addReturnListener(new ReturnListener() {
            /**
             * @param i 队列响应给浏览器的状态码
             * @param s 状态码对应的文本信息
             * @param s1 交换机名字
             * @param s2 路由 key
             * @param basicProperties 消息的属性
             * @param bytes 消息体内容
             * @throws IOException -
             */
            @Override
            public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                System.out.println("监听到不可达消息!");
                System.out.println(String.format("状态码:%d,文本信息:%s,交换机名字:%s,路由 key:%s", i, s, s1, s2));
            }
        });
        channel.basicPublish(EXCHANGE_NAME, ROUTING_ERROR_KEY, true, null, "Return 机制的错误路由 Key 测试".getBytes());
    }
}

消费者:

/**
 * @author CHAN
 * @date 2020/03/12
 */
public class Consumer {
    private static final String EXCHANGE_NAME = "exchange-return";
    private static final String ROUTING_KEY = "return.#";
    private static final String QUEUE_NAME = "return_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //绑定
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        //申明消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到消息:" + new String(body));
            }
        };
        //进行消费的绑定
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

消费端的限流

  假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!
  当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。

生产者(无其他特别,普通生产者):

/**
 * @author CHAN
 * @date 2020/03/12
 */
public class Producer {
    private static final String QUEUE_NAME = "limit-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("", QUEUE_NAME, null, ("发布消息:" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者:

/**
 * @author CHAN
 * @date 2020/03/12
 */
public class Consumer {
    private static final String QUEUE_NAME = "limit-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        final Channel channel = ConnectionUtils.getConnection().createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //设置限流机制
        // 参数一:消息本身的大小,如果设置为0,表示对消息本身的大小不限制
        // 参数二:告诉 rabbitmq 不要一次性给消费者推送大于 N 个消息,推送的前提是当前的 N 个消息已经被手动确认
        // 参数三:是否将上面的设置应用整个通道,true 表示该通道的消费者使用策略,false 表示只有当前消费者使用策略
        channel.basicQos(0, 1, false);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1收到消息:" + new String(body));
                // 手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

TTL 队列(Time To Live)

假设场景,我要下单,下单之后如果我的订单在一定时间内没有被处理,那么订单就自动失效,简单的说就是我们的队列中的消息是有时间限制的,如果超时那么这个消息将会被队列删除。

生成者:

/**
 * @author CHAN
 * @date 2020/03/12
 */
public class Producer {
    private static final String QUEUE_NAME = "ttl-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 只要在声明队列时设置好属性,这个队列就自动变成 ttl 队列了
        Map<String, Object> properties = new HashMap<>();
        // 声明为 ttl 队列并设置过期时间为5秒
        properties.put("x-message-ttl", 5000);
        channel.queueDeclare(QUEUE_NAME, false, false, false, properties);
        channel.basicPublish("", QUEUE_NAME, null, "发布消息".getBytes());
        channel.close();
        connection.close();
    }
}

死信队列

什么时私信队列?

当发送到队列中的消息被拒绝后,消息的 ttl 时间过期或者队列达到了最大容量,在满足以上前提下,可以将这些消息重新 push 到另外一台交换机上,这个交换机也有自己的队列,并称为死信队列。死信队列也是一个正常的交换机,和一般的交换机没有什么区别,当这个队列中如果有死信的时候,rabbitmq 就会将这个消息自动发送到我们提前定义好的死信队列中

生成者:

/**
 * @author CHAN
 * @date 2020/03/12
 */
public class Producer {
    private static final String EXCHANGE_NAME = "ttl-dlx-exchange";
    private static final String ROUTING_KEY = "dlx.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, false, null, "发送消息".getBytes());
        }
    }
}

消费者:

/**
 * @author CHAN
 * @date 2020/03/12
 */
public class Consumer {
    private static final String EXCHANGE_NAME = "ttl-dlx-exchange";
    private static final String QUEUE_NAME = "ttl-dlx-queue";

    /**
     * 定义死信队列的交换机名
     */
    private static final String DLX_EXCHANGE_NAME = "dlx-exchange";

    /**
     * 定义死信队列
     */
    private static final String DLX_QUEUE_NAME = "dlx-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 创建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
        // 声明 ttl 队列,并加入死信属性
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 5000);
        map.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
        channel.queueDeclare(QUEUE_NAME, true, false, false, map);
        // 队列与交换机机进行绑定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dlx.#");

        // 创建死信交换机
        channel.exchangeDeclare(DLX_EXCHANGE_NAME, "topic");
        // 声明死信队列
        channel.queueDeclare(DLX_QUEUE_NAME, true, false, false, null);
        // 死信队列绑定交换机
        channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, "#");

        // 消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到消息:" + new String(body));
            }
        };
    channel.basicConsume(DLX_QUEUE_NAME, true, defaultConsumer);

消费者端手动签收和消息的重回队列

消费端接收了队列中的消息,但在进行业务逻辑处理的时候,业务逻辑处理失败怎么办?这时候可以手动签收应答,也可以手动拒绝,让消息重新回到队列中。

生产者:

/**
 * @author CHAN
 * @date 2020/03/12
 */
public class Producer {
    private static final String QUEUE_NAME = "ack-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.basicPublish("", QUEUE_NAME, null, "发布消息".getBytes());
        channel.close();
        connection.close();
    }
}

消费者:

/**
 * @author CHAN
 * @date 2020/03/12
 */
public class Consumer {
    private static final String QUEUE_NAME = "ack-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        final Channel channel = ConnectionUtils.getConnection().createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            /**
             * 参数一:当前的消息标记
             * 参数二:是否批量进行应答
             * 参数三:拒绝签收后是否重回队列
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:" + new String(body));
                // 应答
                // channel.basicAck(envelope.getDeliveryTag(), false);
                // 拒绝应答并重回队列(这里死循环了)
                channel.basicNack(envelope.getDeliveryTag(), false, true);
            }
        };
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

如何保证消息的投递一定是成功的


1、通过日志消息表实现可靠消息的传输:

简单理解为,持久化消息状态,确保消息发送失败后可以被定时调度服务将消息重新发送到队列

全部评论0