RabbitMQ超详细安装教程(Linux)
1 2 24-07-23 初始记录 24-08-09 补充 RabbitMQ 的延迟消息
MQ 全称为 Message Queue。
MQ 也被称为消息中间件。
MQ 也是微服务之间的通信的一种方式。
为什么使用 MQ
任务异步 处理将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
应用程序解耦合
MQ 相当于一个中介,生产方通过 MQ 与消费方交互,它将应用程序进行解耦合。
削峰填谷
技术选型对比
市面上有哪些消息队列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,Redis。
消息队列
优点
缺点
RabbitMQ
1.支持 AMQP 协议 2.基于 erlang 语言开发,高并发性能较好 3.工作模式较为灵活 4.支持延迟消息 5.提供较为友好的后台管理页面 6.单机部署,1~2WTPS
1.不支持水平扩容 2.不支持事务 3.消息吞吐量三者最差 4.当产生消息堆积,性能下降明显 5.消息重发机制需要手动设置 6.不支持消息重复消费
RocketMQ
1.高可用,高吞吐量,海量消息堆积,低延迟性能上,都表现出色 2.api 与架构设计更加贴切业务场景 3.支持顺序消息 4.支持事务消息 5.支持消息过滤 6.支持重复消费 7.支持延迟消息 8.支持消息跟踪 9.天然支持集群、负载均衡 10.支持指定次数和时间间隔的失败消息重发 11.单机部署,5~10WTPS
1.生态圈相较 Kafka 有所不如 2.消息吞吐量与消息堆积能力也不如 Kafka 3.不支持主从自动切换 4.只支持 Java
Kafka
1.高可用,高吞吐量,低延迟性能上,都表现出色 2.使用人数多,技术生态圈完善 3.支持顺序消息 4.支持多种客户端 5.支持重复消费
1.依赖分区,消费者数量受限于分区数 2.单机消息过多时,性能下降明显 3.不支持事务消息 4.不支持指定次数和时间间隔的失败消息重发
消息中间件对比 - 选择建议
消息中间件
建议
Kafka
追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务
RocketMQ
可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双 11 考验
RabbitMQ
性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的 RabbitMQ
RabbitMQ
基础
安装
Windows
下载 RabbitMQ(https://www.rabbitmq.com/docs/download )
下载 Erlang(https://packagecloud.io/rabbitmq/erlang ),并配置环境变量
Erlang
和 RabbitMQ
版本对照:https://www.rabbitmq.com/which-erlang.html
运行 RabbitMQ
1 2 3 4 5 6 7 8 9 10 11 12 13 # 开启管理插件 rabbitmq-plugins enable rabbitmq_management # 启动rabbitmq systemctl start rabbitmq-server # 查看rabbitmq状态 systemctl status rabbitmq-server # 设置rabbitmq服务开机自启动 systemctl enable rabbitmq-server # 关闭rabbitmq服务 systemctl stop rabbitmq-server # 重启rabbitmq服务 systemctl restart rabbitmq-server
Linux
docker 安装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 # 1.yum包更新到最新 yum update # 2.安装需要的软件包(yum-utils提供yum-config-manager的功能,,并且device mapper存储驱动程序需要device-mapper-persistent-data和lvm2) yum install -y yum-utils device-mapper-persistent-data lvm2 # 3.设置yum源为阿里云 yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo # 4.安装docker yum install docker-ce -y # 5.安装后查看docker版本 docker -v # 6.阿里云镜像加速 sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-'EOF' { "registry-mirrors": ["https://73z5h6yb.mirror.aliyuncs.com"] } EOF sudo systemctl daemon-reload sudo systemctl restart docker # 安装启动rabbitmq容器 docker run -d --name 容器名称 -e RABBITMQ_DEFAULT_USER=xxx -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3.8.14-management
docker 常用命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # 启动docker systemctl start docker # 停止docker systemctl stop docker # 重启docker systemctl restart docker # 查看docker状态 systemctl status docker # 开机启动 systemctl enable docker systemctl unenable docker # 查看docker概要信息 docker info # 查看docker帮助文档 docker --help
不使用 docker
下载 RabbitMQ(https://www.rabbitmq.com/docs/download )
下载 Erlang(https://packagecloud.io/rabbitmq/erlang )
Erlang
和 RabbitMQ
版本对照:https://www.rabbitmq.com/which-erlang.html
安装 Erlang 和 RabbitMQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 # 安装 Erlang # rpm -Uvh erlang-23.2.7-2.el7.x86_64.rpm # yum install -y erlang # erl -v # 安装 RabbitMQ # yum install -y socat # rpm -Uvh rabbitmq-server-3.8.14-1.el7.noarch.rpm # yum install -y rabbitmq-server # systemctl start rabbitmq-server # systemctl status rabbitmq-server # 其他命令 # systemctl enable rabbitmq-server # systemctl stop rabbitmq-server # systemctl restart rabbitmq-server
RabbitMQWeb 管理界面及授权操作
1 2 # 打开RabbitMQWeb管理界面插件,然后访问服务器公网ip:15672。默认账号密码 guest,guest rabbitmq-plugins enable rabbitmq_management
添加远程账户
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # 添加用户 rabbitmqctl add_user 用户名 密码 # 设置用户角色,分配操作权限 rabbitmqctl set_user_tags 用户名 角色 # 为用户添加资源权限(授予访问虚拟机根节点的所有权限) rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*" # 修改密码 rabbitmqctl change_ password 用户名 新密码 # 删除用户 rabbitmqctl delete_user 用户名 # 查看用户清单 rabbitmqctl list_users
角色有四种 :
administrator
:可以登录控制台、查看所有信息、并对 rabbitMQ 进行管理
monToring
:监控者;登录控制台,查看所有信息
policymaker
:策略制定者;登录控制台指定策略
managment
:普通管理员;登录控制
基本使用
添加依赖
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
配置信息
1 2 3 4 5 6 7 spring: rabbitmq: rabbitmq: port: 5673 host: localhost username: swsk33 password: 123456
五种模型
Basic Queue
(简单队列模型):一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Component @Slf4j public class RabbitMqSend { @Autowired private AmqpTemplate amqpTemplate; public void sendQueryOrderMessage (Object msg) { amqpTemplate.convertAndSend("xxx" , msg); } } @Component @Slf4j public class RabbitMQListener { @RabbitListener(queues = "xxx") public void handlerOrderQueryMsg (String msg) { log.info("xxx接收到信息:" + msg); return ; } }
Work Queue
(工作队列模式):一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Component @Slf4j public class RabbitMqSend { @Autowired private AmqpTemplate amqpTemplate; public void sendQueryOrderMessage (Object msg) { String queueName = "simple.queue" ; for (int i = 0 ; i < 50 ; i++) { rabbitTemplate.convertAndSend(queueName, msg + i); Thread.sleep(20 ); } } } @RabbitListener(queues = "simple.queue") public void listenWorkQueue1 (String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20 ); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2 (String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200 ); }
在不添加额外配置的情况下,两个消费者会平分消息进行消费。为了避免这种情况,可以多加一个配置。prefetch
这个配置可以控制消费者预取的消息数量。
1 2 3 4 5 spring: rabbitmq: listener: simple: prefetch: 1
Fanout
(发布订阅模式群发 ):需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 @Configuration public class FanoutConfig { @Bean public FanoutExchange fanoutExchange () { return new FanoutExchange ("xxx.fanout" ); } @Bean public Queue fanoutQueue1 () { return new Queue ("fanout.queue1" ); } @Bean public Binding bindingQueue1 (Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } @Bean public Queue fanoutQueue2 () { return new Queue ("fanout.queue2" ); } @Bean public Binding bindingQueue2 (Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } } @Component @Slf4j public class RabbitMqSend { @Autowired private AmqpTemplate amqpTemplate; public void sendQueryOrderMessage (Object msg) { String exchangeName = "xxx.fanout" ; for (int i = 0 ; i < 50 ; i++) { rabbitTemplate.convertAndSend(exchangeName, "" , msg); Thread.sleep(20 ); } } } @RabbitListener(queues = "simple.queue") public void listenWorkQueue1 (String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20 ); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2 (String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200 ); }
Direct
(路由模式):需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Component @Slf4j public class RabbitMqSend { @Autowired private AmqpTemplate amqpTemplate; public void sendQueryOrderMessage (Object msg) { String exchangeName = "xxx.direct" ; rabbitTemplate.convertAndSend(exchangeName, "red" , message); } } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "xxx.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1 (String msg) { System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】" ); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "xxx.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2 (String msg) { System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】" ); }
Topic
(通配符模式):需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配 符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。Topic
类型的 Exchange
与 Direct
相比,都是可以根据 RoutingKey
把消息路由到不同的队列。只不过 Topic
类型 Exchange
可以让队列在绑定 Routing key
的时候使用通配符。
通配符规则 :#
:匹配一个或多个词*
:匹配不多不少恰好 1 个词
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Component @Slf4j public class RabbitMqSend { @Autowired private AmqpTemplate amqpTemplate; public void sendQueryOrderMessage (Object msg) { String exchangeName = "xxx.topic" ; rabbitTemplate.convertAndSend(exchangeName, "china.news" , message); } } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "xxx.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1 (String msg) { System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】" ); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "xxx.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2 (String msg) { System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】" ); }
消息转换器
Spring 会把你发送的消息序列化为字节发送给 MQ,接收消息的时候,还会把字节反序列化为 Java 对象。默认情况下 Spring 采用的序列化方式是 JDK 序列化:数据体积过大、有安全漏洞、可读性差 。
在 publisher 和 consumer 两个服务中都引入依赖
1 2 3 4 5 <dependency > <groupId > com.fasterxml.jackson.dataformat</groupId > <artifactId > jackson-dataformat-xml</artifactId > <version > 2.9.10</version > </dependency >
在启动类中添加一个 Bean
1 2 3 4 @Bean public MessageConverter jsonMessageConverter () { return new Jackson2JsonMessageConverter (); }
延迟消息
RbbitMQ 实现延时消息主要有两种方式:
死信消息(队列 ttl+ 死信 exchange)
2. 延时插件 (rabbitmq-delayed-message-exchange)
死信消息
RabbitMQ 为每个队列设置消息的超时时间。只要给队列设置 x-message-ttl 参数(也可以给单条消息设置存活时间),就设定了该队列所有消息的存活时间,时间单位是毫秒。如果声明队列时指定了死信交换器,则过期消息会成为死信消息。
延时插件
进入 rabbitmq 的 docker 容器
下载插件并上传服务器(下载地址:[https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases ][https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases ])
1 2 3 4 5 6 7 8 9 10 11 12 13 # 将下载好的插件拷贝到docker容器中 docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbit:/plugins # 进入RabbitMQ容器 docker exec -it rabbit /bin/bash # 启动插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 可以通过以下命令查看所有安装过的插件 rabbitmq-plugins list # 然后执行 exit 命令退出 docker 容器
pom 文件引入依赖
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
在代码中配置延迟队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Configuration public class RabbitMqConfig { public static final String ORDER_DELAY_KEY = "order_delay" ; @Bean public CustomExchange customExchange () { Map<String, Object> args = new HashMap <>(); args.put("x-delayed-type" , "direct" ); return new CustomExchange ("order_delay_exchange" , "x-delayed-message" , true , false , args); } @Bean public Queue delayOrderQueue () { return new Queue ("order_delay_queue" , true ); } @Bean public Binding delayOrderBinding () { return BindingBuilder.bind(delayOrderQueue()).to(customExchange()).with(ORDER_DELAY_KEY).noargs(); } }
编写方法向队列发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void sendDelayOrderMessage (Object msg, String routingKey, Long delayTime) { amqpTemplate.convertAndSend("order_delay_exchange" , routingKey, msg, message -> { message.getMessageProperties().setDelay(delayTime.intValue()); return message; }); }
监听队列
1 2 3 4 5 6 @RabbitListener(queues = "order_delay_queue") public void handlerOrderDelayMsg (String msg) { log.info("order_delay_queue接收到信息:" + msg); val jsonObject = JSONUtil.parseObj(msg); }
消息重试
添加如下配置实现消息重试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 spring: listener: simple: retry: enabled: true max-attempts: 10 initial-interval: 3000ms max-interval: 86400000ms multiplier: 2
特别注意: 如果在消息接收端的 application.yml 配置文件中没有添加 RabbitMQ 重试机制的相关配置,当接收端收到消息后程序抛出异常,那么发送端将得不到消息确认(ACK),此时发送端将会循环的发送消息,最终导致内存溢出。
RocketMQ
基础
RocketMQ 组成结构图如下 :
交互过程 如下:
Brokder 定时发送自身状态 到 NameServer。
Producer 请求 NameServer 获取 Broker 的地址。
Producer 将消息发送到 Broker 中的消息队列。
Consumer 订阅 Broker 中的消息队列,通过拉取消息,或由 Broker 将消息推送至 Consumer。
具体介绍 :
Producer Cluster 消息生产者群
Consumer Cluster 消息费群
负责消费消息,一般是后台系统负责异步消费。
两种消费模式:
Push Consumer,服务端向消费者端推送消息
Pull Consumer,消费者端向服务定时拉取消息
NameServer 名称服务器
Broker 消息服务器
安装
Windows
下载
下载地址:https://archive.apache.org/dist/rocketmq/
下载后解压到一个没有空格和中文的目录。并配置安装目录为环境变量。
启动
Broker 默认磁盘空间利用率达到 85% 就不再接收,这里在开发环境可以提高磁盘空间利用率报警阀值为 98%。
1 2 3 4 5 6 7 8 9 10 11 12 # 调整默认的内存大小参数(按机器的大小配置) # "JAVA_OPT=%JAVA_OPT% ‐server ‐Xms512m ‐Xmx512m ‐Xmn512m ‐XX:MetaspaceSize=128m ‐ XX:MaxMetaspaceSize=320m" # 运行bin/mqnamesrv.cmd cd bin/ start mqnamesrv.cmd # 启动broker -n:指定NameServer的地址,运行bin/mqbroker.cmd ‐n 127.0.0.1:9876 cd bin/ start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true #
测试
1 2 3 4 5 6 7 8 # 发送消息 cd bin/ set NAMESRV_ADDR=127.0.0.1:9876 tools.cmd org.apache.rocketmq.example.quickstart.Producer # 接收消息 set NAMESRV_ADDR=127.0.0.1:9876 tools.cmd org.apache.rocketmq.example.quickstart.Consumer
安装管理端
RocketMQ 提供了 UI 管理工具,名为 rocketmq-console,项目地址:https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # 下载源文件,并切换到console分支 # 修改配置 # server.port=9877 rocketmq.config.namesrvAddr=127.0.0.1:9876 rocketmq.config.dataPath=/tmp/rocketmq-console/data # 打包 # mvn clean package ‐Dmaven.test.skip=true # 启动 cd /rocketmq-console/target java -jar rocketmq-console-ng-1.0.0.jar --server.port=9877 --rocketmq.config.namesrvAddr=127.0.0.1:9876 # 访问 http://127.0.0.1:9877
Linux
由于 RocketMQ 由 Java 编写,基本操作与 Windows 一致,上传包到服务器后解压运行即可(这里只记录部分 Linux 命令)。
1 2 3 4 5 6 7 8 9 10 # 编辑文件 vim runserver.cmd # 文件位置下运行命令(后台运行) nohup java -jar rocketmq-console-ng-1.0.0.jar # 控制台相关 tail -100 nohup.out tail -f nohup.out cp /dev/null nohup.out
基本使用
三种消息发送方式
同步消息(sync message)
producer 向 broker 发送消息,执行 API 时同步等待, 直到 broker 服务器返回发送结果 。
异步消息(async message)
producer 向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer 发送消 息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。
单向消息(oneway message)
producer 向 broker 发送消息,执行 API 时直接返回,不等待 broker 服务器的结果 。
消息结构
RocketMQ 的消息包括基础属性和扩展属性两部分:
基础属性
topic: 主题相当于消息的一级分类,具有相同 topic 的消息将发送至该 topic 下的消息队列中,比方说一个电商 系统可以分为商品消息、订单消息、物流消息等,就可以在 broker 中创建商品主题、订单主题等,所有商品的消息发送至该主题下的消息队列中。
消息体:即消息的内容 ,可以的字符串、对象等类型(可系列化)。消息的最大长度是 4M。
消息 Flag:消息的一个标记,RocketMQ 不处理,留给业务系统使用。
扩展属性
tag:相当于消息的二级分类,用于消费消息时进行过滤,可为空 。
keys:Message 索引键,在运维中可以根据这些 key 快速检索到消息, 可为空 。
waitStoreMsgOK:消息 发送时是否等消息存储完成后再返回 。
Message 的基础属性主要包括消息所属主题 topic , 消息 Flag(RocketMQ 不做处理)、 扩展属性、消息体 。
生产者工程
添加依赖
1 2 3 4 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > </dependency >
添加配置
1 2 3 4 rocketmq: nameServer: 127.0 .0 .1 :9876 producer: group: PID_PAY_PRODUCER
发送同步消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Service @Slf4j public class PayChannelAgentServiceImpl implements PayChannelAgentService { @Resource private PayProducer payProducer; @Override public PaymentResponseDTO createPayOrderByAliWAP (AliConfigParam aliConfigParam, AlipayBean alipayBean) throws BusinessException, UnsupportedEncodingException { ... try { PaymentResponseDTO<AliConfigParam> notice = new PaymentResponseDTO <AliConfigParam>(); notice.setOutTradeNo(alipayBean.getOutTradeNo()); notice.setContent(aliConfigParam); notice.setMsg("ALIPAY_WAP" ); payProducer.payOrderNotice(notice); } catch (Exception e) { ... } } }
测试(控制台出现 end… 表示消息发送成功。进入管理端,查询消息。)
消费者工程
配置文件
1 2 rocketmq: nameServer: 127.0 .0 .1 :9876
消费消息(监听消息队列需要指定 topic 与 consumerGroup )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Slf4j @Service @RocketMQMessageListener(topic = "TP_PAYMENT_ORDER", consumerGroup = "CID_PAYMENT_CONSUMER") public class PayConsumer implements RocketMQListener <MessageExt> { @Resource private PayChannelAgentService payAgentService; @Override public void onMessage (MessageExt messageExt) { log.info("开始消费支付结果查询消息:{}" , messageExt); String body = new String (messageExt.getBody(), StandardCharsets.UTF_8); PaymentResponseDTO response = JSON.parseObject(body, PaymentResponseDTO.class); String outTradeNo = response.getOutTradeNo(); String msg = response.getMsg(); String param = String.valueOf(response.getContent()); AliConfigParam aliConfigParam = JSON.parseObject(param, AliConfigParam.class); PaymentResponseDTO result = new PaymentResponseDTO (); if ("ALIPAY_WAP" .equals(msg)) { result = payAgentService.queryPayOrderByAli(aliConfigParam, outTradeNo); ... } else if ("WX_JSAPI" .equals(msg)) { ... } if (TradeStatus.UNKNOWN.equals(result.getTradeState()) || TradeStatus.USERPAYING.equals(result.getTradeState())) { log.info("支付代理‐‐‐支付状态未知,等待重试" ); throw new RuntimeException ("支付状态未知,等待重试" ); } } }
消息发送过程
消息发送流程如下:
Producer 从 NameServer 中获取主题路由信息
Broker 将自己的状态上报给 NameServer,NameServer 中存储了每个 Broker 及主题、消息队列的信息。
Producer 根据 topic 从 NameServer 查询所有消息队列,查询到的结果例如:
1 2 3 4 5 6 [ { "brokerName" : "Broker‐1" , "queueId" : 0 } , { "brokerName" : "Broker‐1" , "queueId" : 1 } , { "brokerName" : "Broker‐2" , "queueId" : 0 } , { "brokerName" : "Broker‐2" , "queueId" : 1 } ]
Producer 按选择算法从以上队列中选择一个进行消息发送,如果发送消息失败则在下次选择的时候 会规避掉失败的 broker。
构建消息,发送消息发送消息前进行校验,比如消息的内容长度不能为 0、消息最大长度、消息必要的属性是否具备等(topic、消息体,生产组等)。如果该 topic 下还没有队列则自动创建,默认一个 topic 下自动创建 4 个写队列,4 个读队列 。
为什么要多个队列?
高可用:当某个队列不可用时其它队列顶上。
提高并发:发送消息是选择队列进行发送,提高发送消息的并发能力。 消息消费时每个消费者可以监听多个队列,提高消费消息的并发能力。
生产组有什么用?
在事务消息中 broker 需要回查 producer,同一个生产组的 producer 组成一个集群,提高并发能力。
监听队列,消费消息一个消费组可以包括多个消费者,一个消费组可以订阅多个主题。一个队列同时只允许一个消费者消费,一个消费者可以消费多个队列中的消息。消费组有两种消费模式:
集群模式:一个消费组内的消费者组成一个集群,主题下的一条消息只能被一个消费者消费。
广播模式:主题下的一条消息能被消费组下的所有消费者消费。消费者和 broker 之间通过推模式和拉模式接收消息,推模式即 broker 推送给消费者,拉模式是消费者主动从 broker 查询消息。
三种消息发送方式
同步消息
见基本使用的例子。
异步消息
producer 向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer 发送消 息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void sendASyncMsg (String topic, String msg) { rocketMQTemplate.asyncSend(topic,msg,new SendCallback () { @Override public void onSuccess (SendResult sendResult) { System.out.println(sendResult.getSendStatus()); } @Override public void onException (Throwable e) { System.out.println(e.getMessage()); } }); } @Test public void testSendASyncMsg () throws InterruptedException { this .producerSimple.sendASyncMsg("my‐topic" , "第一条异步步消息" ); System.out.println("end..." ); Thread.sleep(3000 ); }
单向消息
producer 向 broker 发送消息,执行 API 时直接返回,不等待 broker 服务器的结果 。
1 2 3 4 5 6 7 8 public void sendOneWayMsg (String topic, String msg) { this .rocketMQTemplate.sendOneWay(topic,msg); }
自定义消息格式
使用 String 传递数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void sendMsgByJson (String topic, OrderExt orderExt) { this .rocketMQTemplate.convertAndSend(topic, orderExt); System.out.printf("send msg : %s" , orderExt); } @Component @RocketMQMessageListener(topic = "my‐topic", consumerGroup = "demo‐consumer‐group") public class ConsumerSimple implements RocketMQListener <String> { @Override public void onMessage (String s) { System.out.println(s); } }
使用对象接收数据
1 2 3 4 5 6 7 8 9 10 @Component @RocketMQMessageListener(topic = "my‐topic‐obj", consumerGroup = "demo‐consumer‐group‐obj") public class ConsumerSimpleObj implements RocketMQListener <OrderExt> { @Override public void onMessage (OrderExt orderExt) { System.out.println(orderExt); } }
延迟消息
RocketMQ 的延迟消息实现非常简单,只需要发送消息前设置延迟的时间,延迟时间存在十八个等级 (1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),调用 setDelayTimeLevel() 设置与时间相对应的延迟级别即可。
同步消息延迟
1 2 3 4 5 6 7 8 9 10 11 12 public void sendMsgByJsonDelay (String topic, OrderExt orderExt) throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException { Message<OrderExt> message = MessageBuilder.withPayload(orderExt).build(); this .rocketMQTemplate.syncSend(topic, message, 1000 , 3 ); System.out.printf("send msg : %s" , orderExt); }
异步消息延迟
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void sendAsyncMsgByJsonDelay (String topic, OrderExt orderExt) throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException { String json = this .rocketMQTemplate.getObjectMapper().writeValueAsString(orderExt); org.apache.rocketmq.common.message.Message message = new org .apache.rocketmq.common.message.Message(topic, json.getBytes(Charset.forName("utf‐8" ))); message.setDelayTimeLevel(3 ); this .rocketMQTemplate.getProducer().send(message, new SendCallback () { @Override public void onSuccess (SendResult sendResult) { System.out.println(sendResult); } @Override public void onException (Throwable throwable) { System.out.println(throwable.getMessage()); } }); System.out.printf("send msg : %s" , orderExt); }
消费重试
当消息发送到 Broker 成功,在被消费者消费时如果消费者没有正常消费,此时消息会重试消费。消费重试存在两种场景:
消息没有被消费者接收,比如消费者与 broker 存在网络异常。此种情况消息会一直被消费重试。
当消息已经被消费者成功接收,但是在进行消息处理时出现异常,消费端无法向 Broker 返回成功,这种情况下 RocketMQ 会不断重试。borker 是怎么知道重试呢? 消费者在消费消息成功会向 broker 返回成功状态,否则会不断进行消费重试。
处理策略
消息会按照延迟消息的延迟时间等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)从第 3 级开始重试,每试一次如果还不成功则延迟等级加 1。重试了 16 次还未被成功消费将会投递到死信队列,到达死信队列的消息将不再被消费。
实际生产中不会让消息重试这么多次,通常在重试一定的次数后将消息写入数据库,由另外单独的程序或人工去处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 public class ConsumerSimple implements RocketMQListener <MessageExt> { @Override public void onMessage (MessageExt messageExt) { int reconsumeTimes = messageExt.getReconsumeTimes(); if (reconsumeTimes >= 2 ) { return ; } throw new RuntimeException (String.format("第%s次处理失败.." , reconsumeTimes)); } }
基础
Kafka 是一个分布式的基于开发/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。