消息中间件-RabbitMQ

本文最后更新于 2024年12月2日 晚上

单体架构面临的问题

  • 单体架构系统发展遇到瓶颈,需要拆分服务,拆分后服务间通信成为问题

    • Restful的http
    • Dubbo
    • WebService
    • MQ
  • 线上业务增长所面临的高并发问题,单体架构越来越吃力

    • 同步处理,异步处理
    • 请求堆积
  • 单体架构代码臃肿,耦合度高,维护困难,如何解耦?

什么是消息中间件(消息队列)

MQ (Message Queue),消息队列可以理解为一种在 TCP 协议之上构建的一个 简单的协议,但它又不是具体的通信协议,而是更高层次的 通信模型生产者 / 消费者模型,通过定义自己的生产者和消费者实现消息通信从而屏蔽复杂的底层通信协议;它为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

Message Queue(MQ),消息队列中间件。很多人都说:MQ 通过将消息的发送和接收分离来实现应用程序的异步和解偶,这个给人的直觉是——MQ 是异步的,用来解耦的,但是这个只是 MQ 的效果而不是目的。MQ 真正的目的是为了通讯,屏蔽底层复杂的通讯协议,定义了一套应用层的、更加简单的通讯协议。一个分布式系统中两个模块之间通讯要么是 HTTP,要么是自己开发的 TCP,但是这两种协议其实都是原始的协议。HTTP 协议很难实现两端通讯——模块 A 可以调用 B,B 也可以主动调用 A,如果要做到这个两端都要背上 WebServer,而且还不支持长连接(HTTP 2.0 的库根本找不到)。TCP 就更加原始了,粘包、心跳、私有的协议,想一想头皮就发麻。MQ 所要做的就是在这些协议之上构建一个简单的“协议”——生产者/消费者模型。MQ 带给我的“协议”不是具体的通讯协议,而是更高层次通讯模型。它定义了两个对象——发送数据的叫生产者;接收数据的叫消费者, 提供一个 SDK 让我们可以定义自己的生产者和消费者实现消息通讯而无视底层通讯协议。

总结一下:

  • 目的是用来通信
  • 通信模型生产者 / 消费者模型
  • 异步,解耦,削峰填谷
  • 消息堆积、高吞吐、可靠重试

核心概念

  • Topic:消息主题,一级消息类型,生产者向其发送消息
  • 生产者:也称为消息发布者,负责生产并发送消息至 Topic
  • 消费者:也称为消息订阅者,负责从 Topic 接收并消费消息
  • 消息:生产者向 Topic 发送并最终传送给消费者的数据和(可选)属性的组合
  • 消息属性:生产者可以为消息定义的属性,包含 Message Key 和 Tag
  • Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致

消息队列的数据结构

消息队列采用 FIFO 的方式,即 先进先出 的数据结构。
先进先出数据结构

消息队列的两大流派

有Broker

这个流派通常有一台服务器作为 Broker,所有的消息都通过它中转。生产者把消息发送给它就结束自己的任务了,Broker 则把消息主动推送给消费者(或者消费者主动轮询)。

重Topic

kafka、RocketMQ (前身是 ActiveMQ) 就属于这个流派,生产者会发送 KEY 和数据到 Broker,由 Broker 比较 KEY 之后决定给哪个消费者(推送消息给消费者)。
重Topic

轻Topic

这种的代表是 RabbitMQ(或者说是 AMQP),生产者发送 KEY 和数据,消费者定义订阅的队列,Broker 收到数据之后会通过一定的逻辑计算出 KEY 对应的队列,然后把数据交给队列
轻Topic

这种模式下解耦了 KEY 和 Queue,在这种架构中 Queue 是非常轻量级的(在 RabbitMQ 中它的上限取决于你的内存),消费者关心的只是自己的 Queue;生产者不必关心数据最终给谁只要指定 KEY 就行了,中间的那层映射在 AMQP 中叫 Exchange。

无Broker

无 Broker 的 MQ 的代表是 ZeroMQ。该作者非常睿智,他非常敏锐的意识到 MQ 是更高级的 Socket,它是解决通讯问题的。所以 ZeroMQ 被设计成了一个 “库” 而不是一个中间件
无Broker

节点之间通讯的消息都是发送到彼此的队列中,每个节点都既是生产者又是消费者。ZeroMQ 做的事情就是封装出一套类似于 Socket 的 API 可以完成发送和读取数据。

消息队列技术选型

消息队列有很多种,那么如何做技术选型?

  • 业务特点
  • 各类技术、中间件的特点,各个技术性能指标
  • 现有技术架构,客户要求
  • 技术成熟度,稳定性
  • 文档丰富度
  • 社区活跃度
  • 是否开源、收费

技术没有优劣之分,只有适合不适合。
技术是为业务服务的,技术要走在业务前头。

RabbitMQ的优点

  • 基于 ErLang 语言开发具有高可用高并发的优点,适合集群服务器
  • 健壮、稳定、易用、跨平台、支持多种语言、文档齐全
  • 有消息确认机制和持久化机制,可靠性高
  • 开源

RabbitMQ的概念

生产者和消费者

  • Producer:消息的生产者
  • Consumer:消息的消费者

Queue

  • 消息队列,提供了 FIFO 的处理机制,具有缓存消息的能力。RabbitMQ 中,队列消息可以设置为持久化,临时或者自动删除。
  • 设置为持久化的队列,Queue 中的消息会在 Server 本地硬盘存储一份,防止系统 Crash,数据丢失
  • 设置为临时队列,Queue 中的数据在系统重启之后就会丢失
  • 设置为自动删除的队列,当不存在用户连接到 Server,队列中的数据会被自动删除

Exchange

Exchange 类似于数据通信网络中的交换机,提供消息路由策略。RabbitMQ 中,Producer 不是通过信道直接将消息发送给 Queue,而是先发送给 ExChange。一个 ExChange 可以和多个 Queue 进行绑定,Producer 在传递消息的时候,会传递一个 ROUTING_KEY,ExChange 会根据这个 ROUTING_KEY 按照特定的路由算法,将消息路由给指定的 Queue。和 Queue 一样,ExChange 也可设置为持久化,临时或者自动删除。

Exchange的4种类型

  • direct(默认):直接交换器,工作方式类似于单播,ExChange 会将消息发送完全匹配 ROUTING_KEY 的 Queue(key 就等于 queue)
  • fanout:广播是式交换器,不管消息的 ROUTING_KEY 设置为什么,ExChange 都会将消息转发给所有绑定的 Queue(无视 key,给所有的 queue 都来一份)
  • topic:主题交换器,工作方式类似于组播,ExChange 会将消息转发和 ROUTING_KEY 匹配模式相同的所有队列(key 可以用“宽字符”模糊匹配 queue),比如,ROUTING_KEY 为 user.stock 的 Message 会转发给绑定匹配模式为 * .stock,user.stock* . *#.user.stock.# 的队列。( * 表是匹配一个任意词组,# 表示匹配 0 个或多个词组)
  • headers:消息体的 header 匹配,无视 key,通过查看消息的头部元数据来决定发给那个 queue(AMQP 头部元数据非常丰富而且可以自定义)

Binding

所谓绑定就是将一个特定的 ExChange 和一个特定的 Queue 绑定起来。ExChange 和 Queue 的绑定可以是多对多的关系。

Virtual Host

在 RabbitMQ Server 上可以创建多个虚拟的 Message Broker,又叫做 Virtual Hosts (vhosts)。每一个 vhost 本质上是一个 mini rabbitmq server,分别管理各自的 ExChange,和 bindings。vhost 相当于物理的 Server,可以为不同 app 提供边界隔离,使得应用安全的运行在不同的 vhost 实例上,相互之间不会干扰。Producer 和 Consumer 连接 rabbit server 需要指定一个 vhost。

RabbitMQ的使用过程

  • 客户端连接到消息队列服务器,打开一个 Channel。
  • 客户端声明一个 ExChange,并设置相关属性。
  • 客户端声明一个 Queue,并设置相关属性。
  • 客户端使用 Routing Key,在 ExChange 和 Queue 之间建立好绑定关系。
  • 客户端投递消息到 ExChange。
  • ExChange 接收到消息后,就根据消息的 key 和已经设置的 binding,进行消息路由,将消息投递到一个或多个队列里

RabbitMQ的下载安装

Linux、Windows版本下载安装移步官网

使用Docker安装方便快捷,编排文件(docker-compose.yaml)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
version: '3.1'
services:
rabbitmq:
image: rabbitmq:management
container_name: rabbitmq
hostname: rabbitmq
ports:
- 5672:5672
- 15672:15672
environment:
TZ: Asia/Shanghai
RABBITMQ_DEFAULT_USER: rabbit
RABBITMQ_DEFAULT_PASS: rabbit
volumes:
- ./data:/var/lib/rabbitmq

启动RabbitMQ;

1
2
3
4
5
6
7
8
# 启动
docker-compose up -d
# 停止
docker-compose down
# 重启
docker-compose restart
# 日志
docker-compose logs

访问控制台:
http://192.168.174.11:15672
用户名/密码:
rabbit/rabbit

RabbitMQ控制台简介

管理界面中的功能

RabbitMQ管理界面

添加用户

添加用户admin:
添加用户
用户角色:

  • 超级管理员(administrator)
    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
  • 监控者(monitoring)
    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
  • 策略制定者(policymaker)
    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
  • 普通管理者(management)
    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
  • 其他
    无法登陆管理控制台,通常就是普通的生产者和消费者。

创建Virtual Hosts

创建Virtual Hosts:
创建Virtual Hosts

设置权限

选中admin用户,设置权限:
设置权限

RabbitMQ的五种模式

  • Queue:简单模式
  • Work Queue:Work模式
  • Publish/Subscribe:发布/订阅模式
  • Routing:路由模式
  • Topics:通配符模式

RabbitMQ的五种模式

导入项目

使用了Spring Boot项目来做测试,项目结构、依赖、配置文件如下:
项目结构、依赖、配置文件

简单模式

点对点,简单的生产者消费者。
我们需要将消息发给唯一一个节点时使用这种模式,这是最简单的一种形式。生产者将消息发送到队列,消费者从队列中获取消息。
简单模式

先建立一个叫simple的队列

  • Durability:是否做持久化 Durable(持久) transient(临时)
  • Auto delete : 是否自动删除

simple队列

编写测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package pub.fenston.rabbit;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitApplicationTests {

@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void testSimple() {
rabbitTemplate.convertAndSend("simple","RabbitMQ:发送消息,简单模式");
}

}

运行测试类

发送消息

编写消费者类,监听simple

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package pub.fenston.rabbit.consumer;
import java.util.Date;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import pub.fenston.rabbit.bean.Message;
import pub.fenston.rabbit.mapper.MessageMapper;

import javax.annotation.Resource;

@Slf4j
@Component
@RabbitListener(queues = "simple")
public class RabbitSimpleConsumer {

@Resource
private MessageMapper messageMapper;

@RabbitHandler
public void receive(String msg) {
log.info("Consumer(simple):" + msg);
Message message = new Message();
message.setMsg(msg);
message.setCreateTime(new Date());
message.setModifyTime(new Date());
messageMapper.insert(message);
}

}

启动后可以发现消息被消费

消息被消费

工作模式

工作模式和上边的简单模式差不多,只是对于当前的队列多了一个消费者:一个生产者、2个消费者。一个消息只能被一个消费者获取。
工作模式

编写测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package pub.fenston.rabbit;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitApplicationTests {

@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void testWork() {
rabbitTemplate.convertAndSend("simple","RabbitMQ:发送消息,work模式");
}

}

编写消费者类,监听simple

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package pub.fenston.rabbit.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import pub.fenston.rabbit.bean.Message;
import pub.fenston.rabbit.mapper.MessageMapper;

import javax.annotation.Resource;
import java.util.Date;

@Slf4j
@Component
@RabbitListener(queues = "simple")
public class RabbitWorkConsumer {

@Resource
private MessageMapper messageMapper;

@RabbitHandler
public void receive(String msg) {
log.info("Consumer(work):" + msg);
Message message = new Message();
message.setMsg(msg);
message.setCreateTime(new Date());
message.setModifyTime(new Date());
messageMapper.insert(message);
}

}

多运行几次测试方法在看看控制台打印结果

可以发现它是轮询的:
轮询

发布/订阅模式

  • 1、一个生产者,多个消费者
  • 2、每一个消费者都有自己的一个队列
  • 3、生产者没有将消息直接发送到队列,而是发送到了交换机
  • 4、每个队列都要绑定到交换机
  • 5、生产者发送的消息,经过交换机,到达队列,实现一个消息被多个消费者获取的目的
  • 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费

发布/订阅模式

首先创建一个交换机exchange

如下图:
交换机exchange

再创建一个消息队列publish

如下图:
消息队列publish

绑定交换机与队列

双击刚刚创建的转换机exchange将两个队列simple、publish与之进行绑定 :
绑定交换机与队列

绑定成功之后如下

绑定成功

编写测试类

注意,次数是将消息发送给交换机exchange,由交换机进行转发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package pub.fenston.rabbit;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitApplicationTests {

@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void testFanout() {
rabbitTemplate.convertAndSend("exchange","","RabbitMQ:发布/订阅模式");
}

}

编写消费者类,监听publish

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package pub.fenston.rabbit.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import pub.fenston.rabbit.bean.Message;
import pub.fenston.rabbit.mapper.MessageMapper;

import javax.annotation.Resource;
import java.util.Date;

@Slf4j
@Component
@RabbitListener(queues = "publish")
public class RabbitFanoutConsumer {

@Resource
private MessageMapper messageMapper;

@RabbitHandler
public void receive(String msg) {
log.info("Consumer(fanout):" + msg);
Message message = new Message();
message.setMsg(msg);
message.setCreateTime(new Date());
message.setModifyTime(new Date());
messageMapper.insert(message);
}

}

运行测试

先重启项目后再运行测试方法,控制台打印如下(RabbitSimpleConsumer是与队列simple进行绑定的)。
可以看到RabbitSimpleConsumer和RabbitFanoutConsumer均收到消息,实现了发布/订阅。
运行测试

路由模式

  • 交换机绑定的路由key和队列绑定的一样时,才发送。
  • 根据发送消息的Routing key来进行匹配(不支持占位符匹配)。
  • 一个消息只能被一个相匹配的消息队列获取。

路由模式

新建一个交换机routing

修改类型为direct:
交换机routing

将队列simple与publish与交换机routing进行绑定

修改simple的Routing Key 的值为simple-msg:
simple-msg
修改publish的Routing Key 的值为routing-msg:
routing-msg
绑定成功后效果图如下:
绑定成功

编写测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package pub.fenston.rabbit;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitApplicationTests {

@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void testRouting() {
rabbitTemplate.convertAndSend("routing","simple-msg","RabbitMQ:路由模式,simple-msg");
rabbitTemplate.convertAndSend("routing","simple-msg","RabbitMQ:路由模式,routing-msg");
rabbitTemplate.convertAndSend("routing","simple-msg","RabbitMQ:路由模式,other-msg");
}

}

运行测试

先启动项目后再运行测试方法,可以看到,只有匹配了Routing key的消息被转发消费:
消息被转发消费

主题模式

此类交换器使得来自不同的源头的消息可以到达一个对列。
其实说白了就是根据消息的Routing key来进行模糊匹配,一个消息的Routing key如果有多个消息队列匹配那就能被相匹配的接收。
主题模式

新建一个交换机topic

注意,类型一定要改为topic:
交换机topic

新建一个消息队列topic

如下:
消息队列topic
现在共有三个队列:
消息队列

将这三个队列与交换机topic进行绑定并设置Routing key的值

  • 符号 # 匹配一个或多个词
  • 符号 * 匹配不多不少一个词
  • 因此 test.# 能够匹配到text.xx.xxx,但是 test.* 只会匹配到text.xx
  • 根据发送消息的Routing key来进行匹配,一个消息能被多个相匹配的消息对列获取。

error
publish
topic
绑定完之后效果图如下:
绑定完之后效果图

编写消费者类,监听topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package pub.fenston.rabbit.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import pub.fenston.rabbit.bean.Message;
import pub.fenston.rabbit.mapper.MessageMapper;

import javax.annotation.Resource;
import java.util.Date;

@Slf4j
@Component
@RabbitListener(queues = "topic")
public class RabbitTopicConsumer {

@Resource
private MessageMapper messageMapper;

@RabbitHandler
public void receive(String msg) {
log.info("Consumer(topic):" + msg);
Message message = new Message();
message.setMsg(msg);
message.setCreateTime(new Date());
message.setModifyTime(new Date());
messageMapper.insert(message);
}

}

编写测试类

相比于路由模式只改变了交换机的类型为topic,并且主题模式支持Routing key使用“*”、“#”占位符匹配。
error.可以匹配到:error.log、error.sss,匹配不到error.log.sss,而‘#’可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package pub.fenston.rabbit;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitApplicationTests {

@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void testTopic() {
rabbitTemplate.convertAndSend("topic","error.log","RabbitMQ:主题模式,error.log");
}

}

运行测试类

先启动项目后再运行测试方法,可以看到,三个都匹配到了:
测试结果


消息中间件-RabbitMQ
https://www.bugfree.top/2021/02/26/middle/消息中间件-RabbitMQ/
作者
lizhenguo
发布于
2021年2月26日
更新于
2024年12月2日
许可协议