RabbitMQ 四大交换机详解及代码实现

1. 交换机概念

AMQP规范及RabbitMQ中,交换机(Exchange)的作用是:接受生产者应用程序发布的消息,并根据一定的规则(由交换机的类型决定)将这些消息路由(即转发)到消息队列上,它本身并不会做存储。

在前面的HelloWorld的示例代码中,我们使用了是默认的交换机("")来将消息转发到队列:

1
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

事实上,生产者只能将消息发送给一个交换机,并由不同的交换机类型来将消息转发给指定的队列,而生产者甚至不知道消息是否会被传递到哪些队列:

而交换机充当着一边接收生产者的消息,另一边将消息发送到指定的队列的角色。并且根据不同的交换机类型,发送到队列的可能有几种规则:发送到指定队列(默认交换机)、发送给许多队列(扇形交换机)、发送给指定规则的队列(主题交换机)。下面,我们来一一介绍这几种交换机类型。

2. 四大交换机

前面我们说到的默认交换机(default exchange)也称为匿名交换机(Nameless exchange),它用空字符串""来表示,另外它不需要被声明也不需要进行任何和其他队列绑定的操作。严格意义上来说,它并不是我们所说的四大交换机的范畴。

1
2
// 发送到 hello 队列
channel.basicPublish("", "hello", null, message.getBytes());

它转发的特点是:消息会被发送到routing_key指定的队列(如果该队列存在的话)。介绍完在RabbitMQ这个特殊的交换机之后,让我们来了解下真正的四大交换机的转发规则吧。

2.1 扇形交换机

扇形交换机很简单,是一个广播的交换机——它会将收到的所有消息广播到它所知道的所有队列里

Fanout_exchange.png

例如我们创建生产者的代码,声明一个交换机。并通过BuiltinExchangeType.FANOUT来指定它是一个扇形交换机,然后声明一个临时队列,并将该队列和交换机通过queueBind方法绑定在一起。最后消费该临时队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Channel channel = connection.createChannel();
// 声明一个扇形交换机
channel.exchangeDeclare("fanout_demo", BuiltinExchangeType.FANOUT);
// 声明一个临时队列,队列名为RabbitMQ生成的随机名
String queueName = channel.queueDeclare().getQueue();
// 将队列和交换机绑定
channel.queueBind(queueName, "fanout_demo", "");
// 消费该队列
channel.basicConsume(queueName, true,
(consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer Received => " + message);
},
(consumerTag) -> {

});

这里提到一个临时队列的概念,它有如下的特点。在应用中,我们可以通过queueDeclare()(不传入任何参数值)来声明一个临时队列,将会返回该队列的随机队列名,例如amq.gen-JzTY20BRgKO-HjmUJj0wLg

  • 队列名由随机的名字组成;
  • 当消费者断开连接时,该临时队列将会被删除。

我们同时启动两个FanoutExchangeRecv的线程消费各自的临时队列:

1
2
3
4
for (int i = 0; i < 2; i++) {
new Thread(new FanoutExchangeRecv())
.start();
}

访问http://localhost:15672管理后台就可以看到交换机fanout_demo与两个临时队列绑定在一起:

fanout_exchange_binding.png

这样,当生产者通过basicPublish方法向该扇形交换机fanout_demo发送消息时,所有与该交换机绑定的队列都能接收到消息:

1
2
3
4
5
6
7
8
// 消费者应用代码,省略了不重要的部分
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel();) {

String message = "The message sent by fanout exchange";
// 向 fanout_demo 交换机发送消息
channel.basicPublish("fanout_demo", "", null, message.getBytes());
}

2.2 直连交换机

直连交换机是一种带路由功能的交换机,一个直连交换机可以通过routingKey和多个队列进行绑定。当将消息给该交换机时,需要指定一个routingKey,交换机将会发送到与之相同的routingKey的队列。如下图,由于队列1通过key1与直连交换机进行绑定,因此当消息携带routingKeykey1时,交换机将会把消息转发到队列1中:

direct_exchange.png

在生产者代码中,直连交换机的声明方式和扇形交换机大致相同, 唯一不同的地方是不同的队列将由不同的routingKey变量值来与交换机进行绑定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Channel channel = connection.createChannel();
// 声明一个直连交换机
channel.exchangeDeclare("direct_demo", BuiltinExchangeType.DIRECT);
// 声明一个临时队列,队列名为RabbitMQ生成的随机名
String queueName = channel.queueDeclare().getQueue();
// 将交换机和临时队列通过routingKey(这里将由构造函数传入)进行绑定
channel.queueBind(queueName, "direct_demo", routingKey);
// 消费该临时随机队列
channel.basicConsume(queueName, true,
(consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer Received => " + message);
},
(consumerTag) -> {

});

启动两个生产者线程,分别传入不同的routingKey

1
2
3
4
for (int i = 1; i < 3; i++) {
new Thread(new DirectExchangeRecv("direct-key-" + i))
.start();
}

管理员后台可以看到该交换机通过各自的routingKey和两个临时队列绑定在一起:

direct_exchange_binding.png

这样,在生产者通过basicPublish方法向直连交换机发送消息时,可以传入指定的routingKey发送到相应的队列:

1
2
3
4
5
6
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

String message = "The message sent by direct exchange";
// 发送到第二个程序的队列
channel.basicPublish("direct_demo", "direct-key-2", null, message.getBytes());

2.3 主题交换机

主题交换机(Topic exchange)是一个特殊的交换机,它既有扇形交换机的功能,例如这里的user.news发送到了队列1和队列2。同样具有直连交换机的功能,例如这里的helloroutingKey发送到了队列3:

topic_exchange.png

主题交换机具有的特点是:发送到主题交换机上的消息需要携带执行规则的routingKey,主题交换机会根据这个规则将数据发送到符合规则的(多个)队列上。它具有的规则有:

  • *表示一个单词;
  • #表示任意数量(零个或多个)单词。

例如图中消费者向交换机发送携带user.newsroutingKey的消息,交换机将会把消息转发到user.##.news这两个符合规则的队列当中。下图简单地描述了规则的转发:

有几种特殊的规则情况:

  • 如果一个队列绑定的键为#时候,这个队列将会无视消息的路由键,接受所有的消息。
  • 当队列绑定的routingKey中没有*#这俩特殊字符时,则此时主题交换机将拥有直连交换机的功能,和其效果是一样的。

同样,在主题交换机的生产者代码,也是由不同的routingKey变量值来与交换机进行绑定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Channel channel = connection.createChannel();
// 声明一个主题交换机
channel.exchangeDeclare("topic_demo", BuiltinExchangeType.TOPIC);
// 声明一个临时队列,队列名为RabbitMQ生成的随机名
String queueName = channel.queueDeclare().getQueue();
// 将交换机和临时队列通过routingKey进行绑定,例如user.#和#.news
channel.queueBind(queueName, "topic_demo", routingKey);
// 消费该临时队列
channel.basicConsume(queueName, true,
(consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer Received => " + message);
},
(consumerTag) -> {});

同时启动TopicExchangeRecv两个线程,分别通过user.##.newsroutingKey与交换机进行绑定:

1
2
3
4
for (String key : new String[]{"user.#", "#.news"}) {
new Thread(new TopicExchangeRecv(key))
.start();
}

这样,两个消费者的声明的临时队列都通过自己的规则routingKey和主题交换机绑定在一起:

topic_exchange_binding.png

生产者将消息发送给交换机时,携带user.newsroutingKey。运行生产者代码,两个消费者都能接受到发送的消息:

1
2
3
4
5
6
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

String message = "The message sent by topic exchange";
channel.basicPublish("topic_demo", "user.news", null, message.getBytes());
}

2.4 首部交换机

首部交换机使用可能不多,可以参考一下下面对首部交换机的说明:

首部交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTPHeaders。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。

绑定交换机队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。

作者:whthomas

链接:https://www.jianshu.com/p/469f4608ce5d

來源:简书

简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

最后,老规矩放出Github的地址。

Pushy wechat
欢迎订阅我的微信公众号