消息队列
定义
来看看维基百科怎么说的,顺带学学英语这波不亏:
In computer science, message queues and mailboxes are software-engineering components typically used for inter-process communication (IPC), or for inter-thread communication within the same process. They use a queue for messaging – the passing of control or of content. Group communication systems provide similar kinds of functionality.
翻译一下:在计算机科学领域,消息队列和邮箱都是软件工程组件,通常用于进程间或同一进程内的线程通信。它们通过队列来传递消息、传递控制信息或内容,群组通信系统提供类似的功能。简单的概括下上面的定义:消息队列就是一个使用队列来通信的组件。我们日常所说的消息队列指的是消息中间件。
消息队列的应用场景
从本质上来说是因为随着互联网的快速发展,业务不断扩张,促使技术架构需要不断的演进。从以前的单体架构到现在的微服务架构,成百上千的服务之间相互调用和依赖。从互联网初期一个服务器上有100个在线用户已经很了不得,到现在坐拥10亿日活的微信。我们需要有一个「东西」来解耦服务之间的关系、控制资源合理合时地使用,以及缓冲流量洪峰等等。消息队列就应运而生了。它常用来实现:异步处理、服务解耦、流量控制。
保证消息不丢失
先来看看这张图:
一共有三个阶段:生产消息、存储消息和消费消息。我们从这三个阶段来分别看看如何确保消息不会丢失。
生产消息
生产者发送消息至Broker
,需要处理Broker
的响应,不论是同步还是异步发送消息,同步和异步回调都需要做好try-catch
,妥善的处理响应:
- 如果
Broker
返回写入失败等错误消息,需要重试发送。 - 当多次发送失败需要作报警,日志记录等。
这样就能保证在生产消息阶段消息不会丢失。
存储消息
存储消息阶段需要在消息刷盘之后再给生产者响应,假设消息写入缓存中就返回响应,那么机器突然断电这消息就没了,而生产者以为已经发送成功了。如果Broker
是集群部署,有多副本机制,即消息不仅仅要写入当前Broker
,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。这样基本上就能保证存储的可靠了。一台挂了还有一台在呢(假如怕两台都挂了..那就再多些)。那假如来个地震机房机子都挂了呢?大公司基本上都有异地多活。那要是这几个地都地震了呢?这时候还是先关心关心人吧。
消费消息
这里经常会有同学犯错,有些同学当消费者拿到消息之后直接存入内存队列中就直接返回给Broker
消费成功,这是不对的。你需要考虑拿到消息放在内存之后消费者就宕机了怎么办。所以我们应该在消费者真正执行完业务逻辑之后,再发送给Broker
消费成功,这才是真正的消费了。所以只要我们在消费业务逻辑处理完成之后再给Broker
响应,那么消费阶段消息就不会丢失。
重复消息的处理
假设我们发送消息,就管发,不管Broker
的响应,那么我们发往Broker
是不会重复的。但是一般情况我们是不允许这样的,这样消息就完全不可靠了,我们的基本需求是消息至少得发到Broker
上,那就得等Broker
的响应,那么就可能存在Broker
已经写入了,当时响应由于网络原因生产者没有收到,然后生产者又重发了一次,此时消息就重复了。再看消费者消费的时候,假设我们消费者拿到消息消费了,业务逻辑已经走完了,事务提交了,此时需要更新Consumer offset
了,然后这个消费者挂了,另一个消费者顶上,此时Consumer offset
还没更新,于是又拿到刚才那条消息,业务又被执行了一遍。于是消息又重复了。可以看到正常业务而言消息重复是不可避免的,因此我们只能从另一个角度来解决重复消息的问题。
采用幂等来解决。幂等需要分场景去考虑,如果是跟金钱相关的,就需要做强校验;不是很重要的场景做弱校验。
幂等处理重复消息
幂等是数学上的概念,我们就理解为同样的参数多次调用同一个接口和调用一次产生的结果是一致的。
例如这条 SQL
: update t1 set money = 150 where id = 1 and money = 100;
执行多少遍money
都是150,这就叫幂等。因此需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果。上面我那条 SQL
,可以作如下处理:
- 做个前置条件判断,即
money = 100
情况,并且直接修改。 - 做个
version(版本号控制)
,对比消息中的版本号和数据库中的版本号。 - 通过数据库的约束例如唯一键,例如
insert into update on duplicate key...
。 - 记录关键的
key
,比如处理订单这种,记录订单ID,假如有重复的消息过来,先判断下这个ID是否已经被处理过了,如果没处理再进行下一步。 - 也可以用全局唯一ID等等。
基本上就这么几个套路,真正应用到实际中还是得看具体业务细节。
强校验
比如你监听到用户支付成功的消息,你监听到了去加GMV
是不是要调用加钱的接口,那加钱接口下面再调用一个加流水的接口,两个放在一个事务,成功一起成功失败一起失败。每次消息过来都要拿着 订单号+业务场景 这样的唯一标识(比是天猫双十一活动)去流水表查,看看有没有这条流水,有就直接return
不要走下面的流程了,没有就执行后面的逻辑。之所以用流水表,是因为涉及到金钱这样的活动,有啥问题后面也可以去流水表对账,还有就是帮助开发人员定位问题。
弱校验
这个简单,一些不重要的场景,比如给谁发短信啥的,我就把这个 id+场景唯一标识 作为Redis的key,放到缓存里面失效时间看你场景,一定时间内的这个消息就去Redis判断。用KV就算消息丢了可能这样的场景也没关系,反正丢条无关痛痒的通知短信。
保证消息的有序性
有序性分为:全局有序和部分有序。
全局有序
如果要保证消息的全局有序,首先只能由一个生产者往Topic
发送消息,并且一个Topic
内部只能有一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的!不过一般情况下我们都不需要全局有序,即使是同步 MySQL Binlog
也只需要保证单表消息有序即可。
部分有序
因此绝大部分的有序需求是部分有序,部分有序我们就可以将Topic
内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。
图中我画了多个生产者,一个生产者也可以,只要同类消息发往指定的队列即可。
消息堆积的处理
消息的堆积往往是因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。因此我们需要先定位消费慢的原因,如果是bug
则处理 bug
,如果是因为本身消费能力较弱,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。假如逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加Topic
的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic
中,一个队列只会分配给一个消费者。当然你消费者内部是单线程还是多线程消费那看具体场景。不过要注意上面提到的消息丢失的问题,如果你是将接受到的消息写入内存队列之后,然后就返回响应给Broker
,然后多线程向内存队列消费消息,假设此时消费者宕机了,内存队列里面还未消费的消息也就丢了。
分布式事务
事务
概念
一般是指要做的或所做的事情。在计算机术语中是指访问并可能更新数据库中各种数据项的一个程序执行单元(unit
)。事务通常由高级数据库操纵语言或编程语言(如SQL
,C++
或Java
)书写的用户程序的执行所引起,并用形如begin transaction
和end transaction
语句(或函数调用)来界定。事务由事务开始(begin transaction
)和事务结束(end transaction
)之间执行的全体操作组成。
特性
- 事务是恢复和并发控制的基本单位。
- 事务应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为
ACID
特性:- 原子性(atomicity):一个事务是一个不可分割的工作单位,事务中包括的操作要么都做,要么都不做。
- 一致性(consistency):事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。
- 隔离性(isolation):一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰。
- 持久性(durability):持久性也称永久性(permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他操作或故障不应该对其有任何影响。
总结了一下就是:事务就是一系列操作,要么全部成功,要么全部失败。
分布式事务
银行跨行转账业务是一个典型分布式事务场景,假设a需要跨行转账给b,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的acid
,只能够通过分布式事务来解决。
分布式事务就是指事务的发起者、资源及资源管理器和事务协调者分别位于分布式系统的不同节点之上。在上述转账的业务中,用户a-100操作和用户b+100操作不是位于同一个节点上。本质上来说,分布式事务就是为了保证在分布式场景下,数据操作的正确执行。
分布式事务在分布式环境下,为了满足可用性、性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 base 理论:
- 基本业务可用性(basic availability)
- 柔性状态(soft state)
- 最终一致性(eventual consistency)
同样的,分布式事务也部分遵循 acid
规范:
- 原子性:严格遵循
- 一致性:事务完成后的一致性严格遵循;事务中的一致性可适当放宽
- 隔离性:并行事务间不可影响;事务中间结果可见性允许安全放宽
- 持久性:严格遵循
我接触和了解到的分布式事务大概分为:
2PC
(两段式提交)/XA
3PC
(三段式提交)TCC
(Try、Confirm、Cancel)- Saga事务
- 本地消息表
- 事务消息(RocketMQ4.3+支持)
- 最大努力通知
2PC
(两段式提交)/XA
:
XA
是由x/open
组织提出的分布式事务的规范,XA
规范主要定义了(全局)事务管理器(tm
)和(局部)资源管理器(rm
)之间的接口。本地的数据库如mysql
在XA
中扮演的是rm
角色
XA
一共分为两阶段:
第一阶段(prepare):即所有的参与者rm
准备执行事务并锁住需要的资源。参与者ready
时,向tm
报告已准备就绪。
第二阶段 (commit/rollback):当事务管理者(tm
)确认所有参与者(rm
)都ready
后,向所有参与者发送commit
命令。
如果有任何一个参与者prepare
失败,那么tm
会通知所有完成prepare
的参与者进行回滚。
XA
事务的特点是:
- 简单易理解,开发较容易
- 对资源进行了长时间的锁定,并发度低
如果读者想要进一步研究XA
,go
语言以及php
、python
、java
、c#
、node
等都可参考dtm