RabbitMQ 消息队列:五种消息通讯模型

chan 作者
阅读 7423 喜欢 0

RabbitMQ消息模型

RabbitMQ 提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
官方教程:http://www.rabbitmq.com/getstarted.html

准备工作

1、导入 RabbitMQ 相关的包

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>4.5.0</version>
</dependency>

2、创建获取 RabbitMQ 连接对象的工具类

/**
 * 获取 RabbitMQ 连接对象
 * @author CHAN
 * @date 2020/03/10
 */
public class ConnectionUtils {

    private static final String HOST_IP = "服务器地址";

    private static final int HOST_PORT = 5672;

    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置连接主机
        connectionFactory.setHost(HOST_IP);
        // 设置主机端口号
        connectionFactory.setPort(HOST_PORT);
        // 设置虚拟机 一般设置为 '/'
        connectionFactory.setVirtualHost("/");
        // 设置访问用户名
        connectionFactory.setUsername("用户名");
        // 设置访问密码
        connectionFactory.setPassword("密码");
        return connectionFactory.newConnection();
    }
}

一、HelloWorld基本消息模型

生产者将消息发送到队列 然后队列将这个消息发送给消费者

生产者:

/**
 * 生产者
 * @author CHAN
 * @date 2020/03/10
 */
public class Producer {

    /**
     * 队列名字
     */
    private static final String QUEUE_NAME = "mq-helloWord";

    /**
     * 发送消息到队列
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接对象
        Connection connection = ConnectionUtils.getConnection();
        // 创建数据传输通道
        Channel channel = connection.createChannel();
        // 声明队列
        // 参数一:队列名
        // 参数二:是否持久化
        // 参数三:是否排外(1、连接关闭后,队列是否删除 2、是否允许其他通道访问数据)
        // 参数四:是否允许自动删除
        // 参数五:申明队列的时候附带的参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 发送数据到队列
        // 参数一:exchange 交换机,没有就设置为""
        // 参数二:原本这里应该是路由的 key,这个没有 key 直接使用队列名
        // 参数三:发送数据到队列的时候附带的参数
        // 参数四:向队列中发送数据
        channel.basicPublish("", QUEUE_NAME, null, "你好世界".getBytes());
        channel.close();
        connection.close();
    }
}

消费者:

/**
 * 消费者
 * @author CHAN
 * @date 2020/03/10
 */
public class Consumers {

    /**
     * 队列名字
     */
    private static final String QUEUE_NAME = "mq-helloWord";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接对象
        Connection connection = ConnectionUtils.getConnection();
        // 创建通道
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            /**
             * @param consumerTag:消息的唯一标记
             * @param envelope:信封(请求的消息属性封装)
             * @param properties:前面队列带过来的值
             * @param body:接收到的消息
             * @throws IOException  -
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到队列中的消息:" + new String(body));
                // 手动应答
                // 参数一:手动应答的标记
                // 参数二:false 相当于告诉队列收到消息了
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 绑定消费者
        // 参数一:队列名
        // 参数二:是否自动应答
        // 参数三:消费者声明
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

二、Work工作消息模型

多个消费者消费的数据之和才是原来队列中的所有数据 适用于流量的消峰

生产者:

/**
 * @author CHAN
 * @date 2020/03/10
 */
public class Producer {

    /**
     * 队列名
     */
    private static final String QUEUE_NAME = "mq-work";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 发送 100 条消息到队列
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("", QUEUE_NAME, null, ("工作模型消息" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者(多个消费者将平分生产者发布的100条消息):

/**
 * @author CHAN
 * @date 2020/03/10
 */
public class Consumers {

    /**
     * 队列名
     */
    private static final String QUEUE_NAME = "mq-work";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者 1 收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

三、订阅模型 - Fanout

一个生产者有多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送给队列而是发送给交换机,每个队列需要绑定到交换机,生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取

交换机(Exchange)的类型:

fanout:广播,将消息交给所有绑定到交换机的队列
direct:定向,把消息交给符合指定 routing key 的队列
topic:通配符,把消息交给符合 routing pattern(路由模式)的队列

生产者:

/**
 * 订阅模型-fanout
 * @author CHAN
 * @date 2020/03/10
 */
public class Producer {

    /**
     * 交换机名
     */
    private static final String EXCHANGE_NAME = "exchange-fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        // 参数一:交换机名字
        // 参数二:交换机类型
        // 注意:如果使用的是发布订阅模型,交换机类型只能写 "fanout"
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 发送消息到交换机
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(EXCHANGE_NAME, "", null, ("订阅模型-fanout" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者(多个消费者需要声明不同的通道并绑定生产者声明的交换机):

/**
 * @author CHAN
 * @date 2020/03/10
 */
public class Consumers {

    /**
     * 交换机名
     */
    private static final String EXCHANGE_NAME = "mq-fanout";

    /**
     * 队列名
     */
    private static final String QUEUE_NAME = "mq-fanout-queue-01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 将队列绑定到交换机
        // 参数一:队列名
        // 参数二:交换机名
        // 参数三:路由 key (现在没有)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 声明消费之
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("队列 1 收到消息:" + new String(body));
            }
        };
        // 绑定消费者
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

四、订阅模型 - Direct

在 Direct 模型下,队列与交换机的绑定需要指定一个 RoutingKey(路由key),生产者向交换机发送消息时,必须指定消息的 Routingkey,这样,交换机只会向匹配上相同 RoutingKey 的队列发送消息。

生产者:

/**
 * 订阅模型-direct
 * @author CHAN
 * @date 2020/03/12
 */
public class Producer {

    /**
     * 交换机
     */
    private static final String EXCHANGE_NAME = "exchange-direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 设置路由 key
        String routingKey;
        for (int i = 0; i < 100; i++) {
            if (i%2 == 0) {
                routingKey = "queue-A";
            } else {
                routingKey = "queue-B";
            }
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, ("发布消息" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者:

/**
 * @author CHAN
 * @date 2020/03/12
 */
public class Cosnumer1 {
    private static final String QUEUE_NAME = "direct-queue-A";
    private static final String EXCHANGE_NAME = "exchange-direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 绑定队列到交换机,参数三表示路由 key
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "queue-A");
        // 申明消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("路由 key 为 queue-A 的队列接收到数据:" + new String(body));
            }
        };
        // 绑定消费者
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

五、订阅模型 - Topic

相当于 Direct 的加强版,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符

通配符规则:

#:匹配一个或多个词
*:只匹配一个词

生产者:

/**
 * 订阅模型-Topic
 * @author CHAN
 * @date 2020/03/12
 */
public class Producer {
    private static final String EXCHANGE_NAME = "exchange-topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(EXCHANGE_NAME, "www.zqskate.com", null, ("Topic 模型的消息:" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者:

/**
 * @author CHAN
 * @date 2020/03/12
 */
public class Cosnumer1 {
    private static final String QUEUE_NAME = "topic-queue-A";
    private static final String EXCHANGE_NAME = "exchange-topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 绑定队列到交换机(RoutingKey 为"www.zqskate.com",如果需要接收消息,这里的 key 应该为 www.# 或者 www.zqskte.*)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "www.#");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("路由 key 是 www 的队列接收到数据:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

备注一:

使用了交换机发送了数据,如果没有消费者的话这个数据会发生丢失,通过设置这样的属性来解决这个问题

channel.basicPublish(EXCHANGE_NAME, "www.zqkate.com", MessageProperties.PERSISTENT_TEXT_PLAIN, ("路由模型的值:" + i).getBytes());

备注二:关于通道

原本没有通道我们也可以完成这个请求,实际上 Connection 引入这个通道这个概念,是为了降低TCP连接的这样一个消耗,相当于是为了 TCP 的复用,还有一个目的就是为了线程隐私,相当于每一个线程都给你创建了一个通道

全部评论0