RabbitMQ 认识介绍并实现简单生产者-消费者模型

1. 从概念开始

理解什么是RabbitMQ之前,我们来了解一下这个框架基于哪种应用层协议:它基于一种叫做AMQP(Advanced Message Queuing Protocol)协议,是一个提供统一消息服务的应用层标准高级消息协议,为面向消息的中间件设计。

近年来,RabbitMQ作为强大的消息队列框架,和它类似的一些框架你也一定不陌生,有ActiveMQKafka、还有阿里的RocketMQ,都是很优秀的消息队列的大型框架。

那么什么是消息队列呢?消息队列实际上一个存放消息的容器,和JDK中先进先出的数据结构类似。它是一种生产者和消费者模型,当消费者(工作者进程)需要使用消息时就可以从队列中取出消息供自己使用:

它有以下的几个强大的优点:

  • 它有效地将让生产者和消费者解耦,生产者只负责向消息队列中生产消息,而消费者只关心从队列中获取消息,而无需关心生产者的业务逻辑;
  • 可以通过异步处理来提高系统性能(削峰,减少响应所需的时间)。

RabbitMQ就是基于AMQP协议规范,符合生产者与消费模型的消息中间件(Message Broker)。

2. 安装配置

2.1 安装

RabbitMQ官网可以下载到Windows和Linux版本的安装包。需要注意的是,在本地安装RabbitMQ之前,必须先安装Erlang语言的环境,因为RabbitMQ是用Erlang语言编写的,可以在Erlang官网下载安装包。

安装完成之后,进入your installed path\rabbitmq_server-3.7.9\sbin>目录启动服务:

1
$ rabbitmq-server.bat    // windows command

另外,我们可以添加管理后台的插件。同样在该目录下运行命令安装插件:

1
$ rabbitmq-plugins enable rabbitmq_management

然后进入http://localhost:15672地址,用户名和密码都是guest。登录成功后,我们可以在管理员后台监控队列和消费者的所有信息:

rabbmitmq-admin.png

2.2 应用依赖

在Java中使用RabbitMQ,需要添加Maven相关依赖:

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>

3. Hello World

下面我们实现一个非常简单的应用,角色只有一个生产者和消费者,要做的事只是让生产者发送一条“Hello World”的消息到队列,然后消费者从队列中取出,并打印出来。

3.1 生产者

生产者的代码是一个普通的串行执行的程序,执行完所有代码之后将退出。需要执行的操作非常简单,就是通过channel对象的basicPublish方法向队列中发送一条消息。在此之前,我们需要通过工厂对象ConnectionFactory创建连接对象Connection,它是套接字 Socket Connection的抽象。紧接着通过Connection创建Channel对象,它包含了所有网络操作的API。

另外,它们都实现了java.io.Closeable接口,因此我们可以使用JDK7新加入的try-with-resources的错误处理机制来实现自动关闭这些连接对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {

private static final String QUEUE_NAME = "hello"; // 队列名

public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World";
// 发布消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("Provider send => " + message + "");
} catch (Exception e) {
e.printStackTrace();
}
}
}

然后,我们通过Channel对象的queueDeclare方法声明了一个队列。该方法定义的接口如下:

1
2
3
4
5
6
7
8
9
/**
* 声明一个队列
* @param queue 声明的队列名
* @param durable 如果设置为true,该队列在服务端重启后依然存在
* @param exclusive 如果设置为true,则是一个排他性的队列,只对首次声明它的连接可见
* @param autoDelete 如果设置为true,当该队列不再使用时服务端将会自动删除该队列
* @param arguments 其他属性
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

并通过basicPublish方法向该队列中发布一条消息(该内容必须为字节数组格式,这样可以让我们自定义序列化方式)。该方法定义的接口如下:

1
2
3
4
5
6
7
8
9
/**
* 发送一条消息,如果指定的交换机不存在,将会抛出一个错误,并关闭连接。
*
* @param exchange 发布消息指定的交换机
* @param 路由关键字
* @param 消息的附加属性
* @param 消息的主体内容
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

这里提到了一个交换机的概念,在这里,我们指定了第一个参数为"",代表我们使用的默认交换机。而默认交换机有一个特点:指定的路由关键字即是队列名。也就是说,如果我们使用默认交换机,我们想往队列hello发送消息,只需要指定路由关键字为hello即可。另外,交换机的类型还有很多。针对不同交换机的类型不同,路由关键字的作用可能不同。

现在,我们实现了一个简单的生产者。 下面我们将实现一个消费者,接受该生产者发送到队列中的消息。

3.2 消费者

消费者的代码与生产者不同,它不是一个运行完即结束的程序。因为消费者需要阻塞异步接收队列中的消息,所以不能使用try-with-resources来让JVM自动关闭连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Recv {

private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接和通道对象
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Waiting for messages. To exit press CTRL+C");
// 通过回调函数来异步接收生产者发送的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer Received => " + message);
};
// 监听消费
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
System.out.println(consumerTag + " was cancelled.");
});
}
}

和生产者一样,我们需要创建ConnectionChannel对象。可以注意到,在消费者代码中,我们同样调用了queueDeclare方法声明了相同的队列,这是因为可能消费者比生产者先启动,为了确保该队列存在并可以进行消费,所以让消费者在启动时声明了一个相同名字的队列。

因为我们的消费者程序需要从队列中异步地接受消息,我们可以使用回调函数接口DeliverCallback来实现。最后调用basicConsume方法进行监听消费。方法的接口定义为:

1
2
3
4
5
6
7
/**
* @param queue 消费的队列名
* @param autoAck 如果设置为true,框架内部将会自动确认消息,默认是开启的
* @param deliverCallback 消息接收回调队列
* @param cancelCallback 当消费者被取消时的回调队列
*/
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

3.3 测试

运行消费者,程序将阻塞接受消息的发送:

1
Waiting for messages. To exit press CTRL+C

接着运行生产者,将会往hello队列发送一条消息:

1
Provider send => Hello World

同时,消费者将会接受到刚才发送的消息:

1
2
Waiting for messages. To exit press CTRL+C
Consumer Received => Hello World
Pushy wechat
欢迎订阅我的微信公众号