微服务架构

1.认识微服务(微服务!=SpringCloud)

1.1单体架构:讲业务的所有功能集中在一个项目中开发,打成一个包部署。

优点:架构简单、部署成本低、

缺点:耦合度高

1.2分布式架构:根据业务功能对系统进行拆分,每个业务模块作为独立项目开发,称为一个服务。

优点:降低服务耦合、有利于服务升级扩展

缺点:复杂性高、远程调用损耗性能、运维成本高

分布式架构的要考虑问题:

2.微服务技术对比

微服务是一种方案,需要技术框架来落地。从左到右的发展顺序,Dubbo是较老的技术,在Dubbo方案中,服务之间远程调用是通过Dubbo自己创建一种叫Dubbo的协议去实现的,而SpeingCloud是通过http协议实现的。

3.认识微服务SpringCloud

4.服务拆分

4.1服务拆分案例

4.2基于restTemplate发起的http请求实现远程调用服务(实现访问查询订单信息时候,也返回用户信息)


5.提供者与消费者

6.Eureka注册中心

每个服务启动时候都会向eureka注册中心,发送注册服务信息。这样eureka就能知道所有的服务信息,如图

order服务要调用user服务时候,则先找eureka注册中心,拿到user服务的访问地址。

通过负载均衡从中选一个,进行访问调用。

消费者如何感知服务提供者的健康状态???

每个服务,每隔30秒都会向eureka发送心跳。

如果超过30秒没有发送心跳请求,则会从eureka注册中心去除。


7.搭建EurekaServer

7.1引入依赖

1
2
3
4
5
<!--eureka服务端-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

7.2启动类上增加注解@EnableEurekaServer

7.3配置服务信息

7.4运行后点击端口或访问http://127.0.0.1:10086可跳转eurek管理页面

8.服务注册

8.1引入依赖

1
2
3
4
5
<!--eureka客户端依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

8.2配置user-server服务的配置文件

8.3启动userserver后访问eureka客户端

本地电脑服务多开

修改复制服务的启动端口 -Dserver.port=8082

9.服务发现

通过restTemplate服务间调用时候,不用再指定ip,直接指定服务名称,即可调用对应的服务。


若userserver服务存在多个时,restTemplate调用需要通过负载均衡的方式,则在调用方的restTemplate实体上增加注解@LoadBalanced。

负载均衡效果:



10.Ribbon负载均衡
http://userservice/user/101 这并不是一个真实存在的地址,尝试通过浏览器访问是无法访问的。那么说明orderserver并不是直接通过路径调用userserver的。


@LoadBalanced是标记当前这个restTemplate发起的请求,要被Ribbon拦截


源码:快捷键(ctrl+shift+n)搜索LoadBalancerInterceptor.class实现了接口ClientHttpRequestInterceptor


orderserver访问userserver时候,发起的请求会被拦截,断点可查看。

运行时候可看到实体类为RibbonLoadBalancerClient



11.负载均衡策略

方式1:在order-server调用其他任何服务时候都会生效




方式2:在order-server只有调用user-server服务时候会有效果

1
2
3
userservice:
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule

12.饥饿加载


开启饥饿加载后回在项目启动时候就完成服务的加载。没开启则是在第一次访问时候才加载服务。

1
2
3
4
5
ribbon:
eager-load:
enabled: true # 开启饥饿加载
clients: # 指定饥饿加载的服务名称
- userservice

13.安装Nacos

官网链接: https://nacos.io/

下载稳定版本


启动Nacos

1
startup.cmd -m standalone #使用单点模式启动Nacos


启动完成后访问Nacos管理页面

14.Nacos快速入门

父工程中添加nacos的管理依赖

1
2
3
4
5
6
7
8
<!--nacos的管理依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>

客户端依赖

1
2
3
4
5
<!-- nacos客户端依赖包 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>


修改客户端链接Nacos地址

启动服务后在Nacos管理页面能看到服务列表

15.Nacos服务分级存储模型


分集群配置

1
2
discovery:
cluster-name: FJ #集群名称 例如福建、上海、广州......

本次测试集群方法:

可以看到集群的数量为2


详情可查看具体集群

16.Nacos的负载均衡规则NacosRule(集群内优先)

1
2
3
userservice:
ribbon:
NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule # NacosRule为以本地集群内为优先,集群内的服务以随机的方式进行负载均衡。

访问5次页面



测试本地集群都停止时


小tips:在orderserver的日志中能查看到一个告警信息。意思为一个夸集群的请求发生了,这个时候则需要排查本地集群的健康状态。
03-12 16:18:09:054 WARN 19524 —- [io-8088-exec-10] c.alibaba.cloud.nacos.ribbon.NacosRule : A cross-cluster call occurs,name = userservice, clusterName = FJ, instance = [Instance{instanceId=’10.0.6.52#8083#SH#DEFAULT_GROUP@@userservice’, ip=’10.0.6.52’, port=8083, weight=1.0, healthy=true, enabled=true, ephemeral=true, clusterName=’SH’, serviceName=’DEFAULT_GROUP@@userservice’, metadata={preserved.register.source=SPRING_CLOUD}}]

17.Nacos服务实例的权重设置

权重数值越大则访问的次数越多,设置为0时则不会被访问。

18.Nacos环境隔离



重启服务可以看到orderserver在dev环境下。public中就没有了

此时访问接口会报错!



不同环境下的服务是不可见的。

19.Nacos和Eureka的对比

19.1临时实例和非临时实例

一般默认的实例都为临时实例,非临时实例需要另外配置。

什么是临时实例呢?

临时实例是每30秒主动向Nacos发送心跳监测,如果超过时间没有发送请求,则会被Nacos从服务列表中去除。

什么是非临时实例呢?

非临时实例是Nacos主动访问服务,如果访问没有响应,也不会将它从服务列表删除,只会将服务标记为不健康的状态,并将服务变更的信息主动推送给服务消费者。



服务注册到Nacos时,可以选择注册为临时或非临时实例,通过下面的配置来设置

ephemeral: false # 是否是临时实例

总结:

Nacos和Eureka的区别:

1.Nacos支持服务端主动监测提供者状态:临时实例采用心跳模式,非临时实例采用主动检测模式。

2.临时实例心跳不正常会被剔除,非临时实例则不会被剔除。

3.Nacos支持服务列表变更的消息推送模式,服务列表更新更及时。Eureka只能支持心跳检测。

4.Nacos集群默认采用AP方式,当集群中群在非临时实例时,采用CP模式;Eureka采用AP方式。

CAP理论:

  • C - 一致性:
    • 定义: 所有节点在同一时间看到的数据是完全相同的。就像你去查余额,不管访问哪个银行的服务器,看到的数字都一样。
    • 通俗解释: 数据写入成功后,任何后续的读取操作都必须返回这个最新的值。如果系统返回了旧数据,就说明一致性被破坏了。
  • A - 可用性:
    • 定义: 每次请求都能收到一个(非错误的)响应,但不保证响应中的数据是最新的。
    • 通俗解释: 系统要保证”一直能用”,不能宕机。哪怕内部数据正在同步,客户端发来请求,系统也得回一句”您好,我在”,而不是直接超时或报错。
  • P - 分区容错性:
    • 定义: 系统中任意信息的丢失或节点故障,系统还能继续运行。
    • 通俗解释: 简单说就是”网络不稳定”或”网线断了”时,系统不能崩溃。这是分布式系统的必选项,因为网络故障是必然会发生的。

20.Nacos配置管理




21.Nacos配置拉取

项目启动的顺序原本是先读取本地配置文件application.yml。当使用nacos做配置管理时,顺序变为先读取nacos中配置文件,再读取本地配置文件application.yml,两个文件中的配置结合生效。但项目启动时候还没有地方会知道nacos的地址,访问不了nacos。那么就需要在引入优先级更高的配置文件:bootstrap.yml

1
2
3
4
5
<!--nacos的配置管理依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

1
2
3
4
5
6
7
8
9
10
11
12
13
#先删除application.yml中相同的配置信息
spring:
application:
name: userservice
profiles:
active: dev # 环境
cloud:
nacos:
server-addr: http://10.0.6.52:8848 # nacos地址
discovery:
cluster-name: SH #集群名称 例如福建、上海、广州......
config:
file-extension: yaml # 文件后缀名

测试是否能取到Nacos上配置的信息

22.配置热更新

方式一(通过注解@RefreshScope 属性刷新)

1
2
//属性刷新
//@RefreshScope

修改参数后,不用重启服务,直接访问可以看到已经发生变化


方式二(通过@ConfigurationProperties(prefix = “pattern”)注入属性)推荐

23.多环境配置共享(开发、生成、测试等环境)

创建配置

24.Feign-基于Feign的远程调用

1.引入依赖

1
2
3
4
5
<!--feign客户端依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

2.启动类加上注解@EnableFeignClients


3.在调用方创建Feign接口。@FeignClient(被调用方的方法名)




访问接口

feign内置了Ribbon,实现了负载均衡。

访问2次接口可以看到

25.Feign-自定义配置

配置方式1(配置文件方式)、

访问接口后查看控制台可以看到远程调用的日志

配置方式2(配置类方式)、



访问接口后

26.Feign的性能优化

引入依赖

1
2
3
4
5
<!--引入HttpClient依赖-->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>

1
2
3
4
5
feign:
httpclient:
enabled: true #支持httpclient开关
max-connections: 200 #最大连接数 什么值最合适要根据现场用jmeter压测后才知道,微调。
max-connections-per-route: 50 #单个路径的最大连接数

27.Feign的最佳实践



1.创建module,选择为maven项目

2.导入feign依赖


3.其他服务引用刚创建的模板feign-api

4.导包报错的地方重新导包

5.此时会报错无法自动装配。因为默认扫描的包是启动类所在的包,所以扫描不到userClient

6.启动类上的注解@EnableFeignClients中添加属性指定Client所在的包

28.Gateway网关


技术实现有2种:gateway和zuul

29.Gateway网关快速入门

1.创建module,选择为springboot项目

2.引入网关依赖

1
2
3
4
5
<!--网关gateway依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

3.网关也是一个微服务项目,也需要注册为服务,所以还需要引入服务发现的依赖

1
2
3
4
5
<!--nacos服务注册发现依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

4.网关配置


请求是先到网关->网关从Nacos中拉取服务列表->通过路由规则访问对应的服务

30.路由断言工厂

常用的是Path。配置多个断言时需要都满足才可以通过校验。


否则会报错404

31.GatewayFilter过滤器




测试(访问user服务才行,如果是通过order访问user是为空的!)

通过网关访问order服务,order服务内部调用user服务时为空


通过网关直接访问到user服务时才能获取到

32.默认过滤器defaultFilter

defalultFilter过滤器和gatewayFilter配置只在位置上有区别,但却是2个不同的过滤器。gatewayFilter在每个服务下都写一遍,defalultFilter只需在default-filters下写即可。

33.全局过滤器GlobalFilter

GatewayFilter是通过配置定义,处理逻辑是固定的。而GlobalFilter的逻辑需要自己写代码实现。


访问接口报错401

参数中带上?authorization=admin

拦截器可以写多个,那如何指定拦截器的顺序呢?


当3个过滤器的order有相等的时候,会按照defaultFilter>gatewayFilter>GlobalFilter

34.Docker是什么?

02-初识Docker-什么是docker_哔哩哔哩_bilibili

35.Docker与虚拟机

36.Docker架构

37.安装Docker

卸载(可选)

如果之前安装过旧版本的Docker,可以使用下面命令卸载:

1
2
3
4
5
6
7
8
9
10
11
yum remove docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine \
docker-ce

连接服务器执行命令,安装yum工具

1
2
3
yum install -y yum-utils \
device-mapper-persistent-data \
lvm2 --skip-broken

更新本地镜像源

1
2
3
4
5
6
7
8
# 设置docker镜像源
yum-config-manager \
--add-repo \
https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

sed -i 's/download.docker.com/mirrors.aliyun.com\/docker-ce/g' /etc/yum.repos.d/docker-ce.repo

yum makecache fast

安装CE版本的Docker

1
yum install -y docker-ce

启动docker

Docker应用需要用到各种端口,逐一去修改防火墙设置。这边直接关闭防火墙!

本地启动docker前,一定要关闭防火墙后!!

1
2
3
4
# 关闭
systemctl stop firewalld
# 禁止开机启动防火墙
systemctl disable firewalld

云服务器则通过设置开放的端口。

通过命令启动docker

1
2
3
4
5
6
7
8
9
systemctl start docker  # 启动docker服务、

systemctl status docker # 查看docker服务状态

systemctl stop docker # 停止docker服务

systemctl restart docker # 重启docker服务

docker -v #查看docker版本

执行systemctl status docker查看docker状态

配置镜像加速

docker官方镜像仓库网速较差,我们需要设置国内镜像服务:

参考阿里云的镜像加速文档:https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": [
"https://2a6bf1988cb6428c877f723ec7530dbc.mirror.swr.myhuaweicloud.com",
"https://docker.m.daocloud.io",
"https://hub-mirror.c.163.com",
"https://mirror.baidubce.com",
"https://your_preferred_mirror",
"https://dockerhub.icu",
"https://docker.registry.cyou",
"https://docker-cf.registry.cyou",
"https://dockercf.jsdelivr.fyi",
"https://docker.jsdelivr.fyi",
"https://dockertest.jsdelivr.fyi",
"https://mirror.aliyuncs.com",
"https://dockerproxy.com",
"https://mirror.baidubce.com",
"https://docker.m.daocloud.io",
"https://docker.nju.edu.cn",
"https://docker.mirrors.sjtug.sjtu.edu.cn",
"https://docker.mirrors.ustc.edu.cn",
"https://mirror.iscas.ac.cn",
"https://docker.rainbond.cc"
]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker

38.镜像基本操作


镜像操作命令

dockerhub官网地址Docker Official Images

拉取Nginx镜像

1
docker pull nginx

查看镜像列表

1
docker images


导出镜像到磁盘

1
2
3
4
5
docker save
例如:docker save -o nginx.tar nginx:latest
docker save -o是固定的
nginx.tar 是导出后的文件名称
nginx:latest 是指要导出的镜像名称及版本


通过命令ll查看文件列表

删除镜像(不是删除刚刚导出的文件)

导入刚刚导出的镜像文件

39.容器基本操作

镜像通过docker run命令可以创建容器、并且运行容器

创建并且运行Nginx容器:

根据镜像创建容器并运行容器

1
docker run --name mn -d -p 80:80 nginx  

查看所有容器

1
docker ps

访问服务器的80端口可以看到Welcome to nginx!

查看Nginx日志

1
2
docker logs 容器名称 
例如:docker logs mn

容器内文件修改

进入容器

1
docker exec -it mn bash

ls查看当前目录下所有文件

进入Nginx静态文件

1
cd usr/share/nginx/html 

通过vi编辑时候会报错命令找不到,这是因为没有安装编辑器。

可通过以下直接修改

1
2
3
#找到Welcome to nginx修改为我是测试测试的
sed -i 's#Welcome to nginx#我是测试测试的#g' index.html
sed -i 's#<head>#<head><meta charset="utf-8">#g' index.html


退出当前容器输入exit

停止docker容器

1
2
docker stop  容器名称
#例如:docker stop mn

此时通过docker ps是查不出容器的,docker ps默认是查询运行的容器。如果停止的容器也想查询

1
2
#查询所有的容器(包括停止的)
docker ps -a

停止状态启动容器

1
2
docker stop 容器名称
#例如:docker stop mn

删除容器

1
2
3
docker rm 容器名称
#例如:docker rm mn (先停止容器才能删除)
#例如:docker rm -f mn (强制删除容器,即使运行状态也能删除)

创建并运行Redis容器

1
2
3
4
5
docker run --name myredis -d -p 6379:6379 redis redis-server --appendonly yes

#redis-server - 启动 Redis 服务器

#--appendonly yes - 开启 AOF(Append Only File)持久化模式

进入Redis容器

1
2
3
4
5
docker exec -it myredis bash   #进入redis容器
redis-cli #进入redis客户端
keys * #查看所有键
set num 666 #增加键值对
set num #获取num值

40.数据卷

12-使用Docker-数据卷命令_哔哩哔哩_bilibili

创建数据卷

1
2
docker volume create 数据卷名称
#例如 docker volume create html

查看所有数据卷

1
docker volume ls

查看数据卷的详细信息

1
2
docker volume inspect 数据卷名称
#例如docker volume inspect html

删除数据卷

1
2
docker volume prune   #删除未使用的数据卷
docker volume rm 数据卷名称 #删除对应的数据卷 例如:docker volume rm html

41.数据卷挂载

之前都是进入到容器中改内容,挂载数据卷之后数据和容器就分开了。

将naginx的html文件/usr/share/nginx/html,把这个目录挂载到html这个数据卷上,方便操作其中的内容。

如果数据卷html不存在,则会自动创建,可以不需要手动再去另外创建数据卷。

1
docker run --name mynginx -d -p 80:80 -v html:/usr/share/nginx/html nginx

先通过docker volume inspect html查出数据卷的位置,cd进入数据卷。查看是否挂载成功

42.目录挂载

这是直接挂载到宿主机自定义的目录,数据卷挂载是创建数据卷时候会自动生成文件的位置,位置是docker生成的。

以MySql为例

1
docker pull mysql

tmp目录下创建文件夹,分别用来存储mysql的配置文件和mysql的数据

conf文件夹中放入文件

hmy.cnf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
docker run --name mymysql \ #因为一行比较模糊,这边用\换行符号,代码命令还没结束
-e MYSQL_ROOT_PASSWORD=123456 -d -p 3306:3306 \
-v /tmp/mysql/conf/hmy.cnf:/etc/mysql/conf.d/hmy.cnf \
#/tmp/mysql/conf/hmy.cnf宿主机文件 /etc/mysql/conf.d/hmy.cnf容器内文件
-v /tmp/mysql/data:/var/lib/mysql \
#/tmp/mysql/data宿主机目录 /var/lib/mysq容器内目录
mysql:latest

直接复制一下
docker run --name mymysql \
-e MYSQL_ROOT_PASSWORD=123456 -d -p 3306:3306 \
-v /tmp/mysql/conf/hmy.cnf:/etc/mysql/conf.d/hmy.cnf \
-v /tmp/mysql/data:/var/lib/mysql \
mysql:latest

启动后但是容器状态是停止的状态。通过docker logs mymysql可以查看到具体报错日志

(MySQL 9.6 版本不支持配置文件中直接使用 utf8,需要使用 utf8mb4utf8mb3。需修改hmy.cnf文件中的配置)

42.自定义镜像


什么是DockerFile



运行命令

1
2
docker build -t javaweb:1.0 .
#docker build -t 名称:版本 dockerfile的所在目录(.代表当前目录)

运行容器

1
docker run --name web -d -p 8090:8090 javaweb:1.0

访问接口

dockerfile文件内优化

已经有镜像打包好了这些,所以可以缩写为

1
FROM azul/zulu-openjdk-alpine:8

重新构建

1
docker build -t javaweb2.0 .

43.DockerCompose

17-DockerCompose-初始Compose_哔哩哔哩_bilibili

之前使用docker run命令运行容器,Compose可以理解为docker run命令的集合。但不是直接用docker run,而是用了指令来代替。

将上面的配置对比我们之前的docker run命令

1
2
3
4
5
docker run --name mymysql \
-e MYSQL_ROOT_PASSWORD=123456 -d -p 3306:3306 \
-v /tmp/mysql/conf/hmy.cnf:/etc/mysql/conf.d/hmy.cnf \
-v /tmp/mysql/data:/var/lib/mysql \
mysql:latest

安装DockerCompose(最新版docker已经自带)

1
docker compose version

安装Nacos

1
docker pull nacos/nacos-server

创建容器

1
2
3
4
5
6
7
8
9
10
11
12
docker run -d \
> --name nacos-server \
> -p 8848:8848 \
> -p 8080:8080 \
> -p 9848:9848 \
> -p 9849:9849 \
> -e MODE=standalone \
> -e NACOS_AUTH_ENABLE=true \
> -e NACOS_AUTH_TOKEN="VGhpcyBpcyBhIDMyIGJ5dGVzIGxvbmcgc2VjcmV0IGtleSBmb3IgbmFjb3M=" \
> -e NACOS_AUTH_IDENTITY_KEY="nacos" \
> -e NACOS_AUTH_IDENTITY_VALUE="nacos" \
> nacos/nacos-server:latest

44.同步通讯的优缺点

45.异步通讯的优缺点

03-初识MQ—异步通讯的优缺点_哔哩哔哩_bilibili

46.初识MQ

47.安装RabbitMQ

拉取镜像

1
docker pull rabbitmq:management

运行

1
2
3
4
5
6
7
8
9
docker run -d \           
--name mq \ # 给容器命名为 "mq"
--hostname mq1 \ # 设置容器的主机名为 "mq1"
-p 15672:15672 \ # 端口映射:主机15672 -> 容器15672(RabbitMQ管理界面端口)
-p 5672:5672 \ # 端口映射:主机5672 -> 容器5672(RabbitMQ服务端口)
rabbitmq:management # 使用的镜像名称(带管理插件的 RabbitMQ 版本)

直接运行下面指令
docker run -d --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 rabbitmq:management

访问地址http://43.138.249.198:15672/进入mq的管理页面(默认账号密码都为guest)

channel:操作MQ的工具。

exchange:路由消息到队列中。

queue:缓存消息。

virtual host:虚拟主机,是对queue\exchange等资源的逻辑分组。

RabbitMQde 结构和概念

48、MQ的消息模型(官网RabbitMQ Tutorials | RabbitMQ

1.基本消息队列:

创建连接

创建通道

创建队列

运行完代码后,可以看到

消费者接收消息:

49、SpringAMQP

1.父工程引入AMQP依赖

1
2
3
4
5
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

修改配置信息

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 43.138.249.198 #rabbitMQ的端口
port: 5672 #端口
username: guest
password: guest
virtual-host: /

消息发送实现代码
测试类中要使用自动注入,注解:@RunWith(SpringRunner.class)

执行代码后可以看到消息

消息接收实现代码

编写一个类监听simple.queue

1
2
3
4
5
6
7
8
@Component
public class SpringRabbitListener {

@RabbitListener(queues = "simple.queue")
public void listener(String msg){
System.out.println("消费者接收到simple.queue的消息"+msg);
}
}

注意: 消息一旦消费就会从队列中删除,rabbitmq没有消息回溯功能

2.Work Queue工作队列:

发送50条消息

1
2
3
4
5
6
7
8
9
10
11
12
//    work queue基础消息队列
@RabbitListener(queues = "simple.queue")
public void woreQueueListener1(String msg) throws InterruptedException {
System.out.println("消费者1接到消息:"+msg);
Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void woreQueueListener2(String msg) throws InterruptedException{
System.err.println("消费者2.....接到消息:"+msg);
Thread.sleep(200);
}

但是此时并没有实现消费者1处理大多数消息,而是消费者1和2平均处理了消息。
从控制台打印的可以看出:消费者1处理了单数的,消费者2处理了双的(在轮询处理)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
消费者2.....接到消息:hello WorkQueue0
消费者1接到消息:hello WorkQueue1
消费者1接到消息:hello WorkQueue3
消费者1接到消息:hello WorkQueue5
消费者1接到消息:hello WorkQueue7
消费者2.....接到消息:hello WorkQueue2
消费者1接到消息:hello WorkQueue9
消费者1接到消息:hello WorkQueue11
消费者1接到消息:hello WorkQueue13
消费者2.....接到消息:hello WorkQueue4
消费者1接到消息:hello WorkQueue15
消费者1接到消息:hello WorkQueue17
消费者1接到消息:hello WorkQueue19
消费者2.....接到消息:hello WorkQueue6
消费者1接到消息:hello WorkQueue21
消费者1接到消息:hello WorkQueue23
消费者1接到消息:hello WorkQueue25
消费者1接到消息:hello WorkQueue27
消费者2.....接到消息:hello WorkQueue8
消费者1接到消息:hello WorkQueue29
消费者1接到消息:hello WorkQueue31
消费者1接到消息:hello WorkQueue33
消费者2.....接到消息:hello WorkQueue10
消费者1接到消息:hello WorkQueue35
消费者1接到消息:hello WorkQueue37
消费者1接到消息:hello WorkQueue39
消费者1接到消息:hello WorkQueue41
消费者2.....接到消息:hello WorkQueue12
消费者1接到消息:hello WorkQueue43
消费者1接到消息:hello WorkQueue45
消费者1接到消息:hello WorkQueue47
消费者2.....接到消息:hello WorkQueue14
消费者1接到消息:hello WorkQueue49
消费者2.....接到消息:hello WorkQueue16
消费者2.....接到消息:hello WorkQueue18
消费者2.....接到消息:hello WorkQueue20
消费者2.....接到消息:hello WorkQueue22
消费者2.....接到消息:hello WorkQueue24
消费者2.....接到消息:hello WorkQueue26
消费者2.....接到消息:hello WorkQueue28
消费者2.....接到消息:hello WorkQueue30
消费者2.....接到消息:hello WorkQueue32
消费者2.....接到消息:hello WorkQueue34
消费者2.....接到消息:hello WorkQueue36
消费者2.....接到消息:hello WorkQueue38
消费者2.....接到消息:hello WorkQueue40
消费者2.....接到消息:hello WorkQueue42
消费者2.....接到消息:hello WorkQueue44
消费者2.....接到消息:hello WorkQueue46
消费者2.....接到消息:hello WorkQueue48

因为有消息预取机制,消息到达queue后,channel会去取数据,导致消息被平均分配。
那么就需要去配置每次channel取的数据量限制,默认是无限,设置成1后每次只会取一条,等处理完了再取一条。
配置文件中添加配置:

1
2
3
listener:
simple:
prefetch: 1 #每次只取一条消息,处理完成才能获取下一个消息

从控制台打印的可以看出:消费者1处理了比较多的数据,消费者2处理了较少的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
消费者1接到消息:hello WorkQueue0
消费者2.....接到消息:hello WorkQueue1
消费者1接到消息:hello WorkQueue2
消费者1接到消息:hello WorkQueue3
消费者1接到消息:hello WorkQueue4
消费者1接到消息:hello WorkQueue5
消费者1接到消息:hello WorkQueue6
消费者2.....接到消息:hello WorkQueue7
消费者1接到消息:hello WorkQueue8
消费者1接到消息:hello WorkQueue9
消费者1接到消息:hello WorkQueue10
消费者1接到消息:hello WorkQueue11
消费者1接到消息:hello WorkQueue12
消费者2.....接到消息:hello WorkQueue13
消费者1接到消息:hello WorkQueue14
消费者1接到消息:hello WorkQueue15
消费者1接到消息:hello WorkQueue16
消费者1接到消息:hello WorkQueue17
消费者1接到消息:hello WorkQueue18
消费者2.....接到消息:hello WorkQueue19
消费者1接到消息:hello WorkQueue20
消费者1接到消息:hello WorkQueue21
消费者1接到消息:hello WorkQueue22
消费者1接到消息:hello WorkQueue23
消费者2.....接到消息:hello WorkQueue24
消费者1接到消息:hello WorkQueue25
消费者1接到消息:hello WorkQueue26
消费者1接到消息:hello WorkQueue27
消费者1接到消息:hello WorkQueue28
消费者1接到消息:hello WorkQueue29
消费者2.....接到消息:hello WorkQueue30
消费者1接到消息:hello WorkQueue31
消费者1接到消息:hello WorkQueue32
消费者1接到消息:hello WorkQueue33
消费者1接到消息:hello WorkQueue34
消费者1接到消息:hello WorkQueue35
消费者2.....接到消息:hello WorkQueue36
消费者1接到消息:hello WorkQueue37
消费者1接到消息:hello WorkQueue38
消费者1接到消息:hello WorkQueue39
消费者1接到消息:hello WorkQueue40
消费者1接到消息:hello WorkQueue41
消费者2.....接到消息:hello WorkQueue42
消费者1接到消息:hello WorkQueue43
消费者1接到消息:hello WorkQueue44
消费者1接到消息:hello WorkQueue45
消费者1接到消息:hello WorkQueue46
消费者1接到消息:hello WorkQueue47
消费者2.....接到消息:hello WorkQueue48
消费者1接到消息:hello WorkQueue49

1、2点都是只能被一个消费者消费,如果需要多个消费者都收到则需要新的模型如下。

3.发布(publish)、订阅(Subscribe)工作队列:

3.1发布订阅-Fanout Exchange

添加配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
public class FanoutConfig {
// 创建交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}

@Bean
public Queue fanoutQueue1() {
return new Queue("fanoutQueue1");
}
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
// 把fanoutQueue1绑定到fanoutExchange
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}

@Bean Queue fanoutQueue2() {
return new Queue("fanoutQueue2");
}

@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
// 把fanoutQueue2绑定到fanoutExchange
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
}

启动项目后进入mq的控制台可以看到创建了一个名为fanoutExchange的交换机和2个队列