数据不会无缘无故丧失,也不会莫明其妙增加

一、概述

1、曾几何时,知了在一家小公司做项目标时候,都是一个办事打全国,所以涉及到数据一致性的问题,都是间接用当地事务处置。

散布式事务处理计划  第1张

2、跟着时间的推移,用户量增大了,发现一个Java办事扛不住了,于是手艺大佬决定关于系统停止晋级。按照系统的营业关于单体的一个办事停止拆分,然后关于开发人员也停止划分,一个开发人员只开发和维护一个或几个办事中的问题,各人各司其职,分工合做。

散布式事务处理计划  第2张

3、当然办事拆分不是一蹴而就的,那是一个耗时耗力的庞大工程,大大都系统都是停止多轮拆分,然后渐渐构成一个不变的系统。遵守一个核心思惟:先按总体营业停止一轮拆分,后面再按照拆分后的办事模块,停止一个详尽的拆分。 4、跟着办事拆分之后,用户量是抗住了,但是发现数据都在差别的办事中存取,那就引出了一个新的问题:跨办事器,若何包管数据的一致性?当然,跨办事的散布式系统中不单单那个问题,还有其他的一些列问题,如:办事可用性、办事容错性、办事间挪用的收集问题等等,那里只讨论数据一致性问题。 5、说到数据一致性,大致分为三种:强一致性、弱一致性、最末一致性。

强一致性:数据一旦写入,在任一时刻都能读取到最新的值。弱一致性:当写入一个数据的时候,其他处所去读那些数据,可能查到的数据不是最新的最末一致性:它是弱一致性的一个变种,不逃求系统肆意时刻数据要到达一致,但是在必然时间后,数据最末要到达一致。

从那三种一致性的模子上来说,我们能够看到,弱一致性和最末一致性一般来说是异步冗余的,而强一致性是同步冗余的,异步处置带来了更好的性能,但也需要处置数据的抵偿。同步意味着简单,但也一定会降低系统的性能。

二、理论

上述说的数据一致性问题,其实也就是在说散布式事务的问题,如今有一些处理计划,相信各人多几少都看到过,那里带各人回忆下。

2.1、二阶段提交

2PC是一种强一致性设想计划,通过引入一个事务协调器来协调各个当地事务(也称为事务参与者)的提交和回滚。 2PC次要分为2个阶段:

1、第一阶段:事务协调器会向每个事务参与者倡议一个开启事务的号令,每个事务参与者施行筹办操做,然后再向事务协调器回复能否筹办完成。但是不会提交当地事务,但是那个阶段资本是需要被锁住的。

2、第二阶段:事务协调器收到每个事务参与者的回复后,统计每个参与者的回复,若是每个参与者都回复“能够提交”,那么事务协调器会发送提交号令,参与者正式提交当地事务,释放所有资本,完毕全局事务。但是有一个参与者回复“回绝提交”,那么事务协调器发送回滚号令,所有参与者都回滚当地事务,待全数回滚完成,释放资本,打消全局事务。 事务提交换程

散布式事务处理计划  第3张

事务回滚流程

散布式事务处理计划  第4张

当然2PC存在的问题那里也提一下,一个是同步阻塞,那个会消耗性能。另一个是协调器毛病问题,一旦协调器发作毛病,那么所有的参与者处于资本锁定形态,那么所有参与者城市被阻塞。

2.2、三阶段提交

3PC次要是在2PC的根底上做了改良,次要为领会决2PC的阻塞问题。它次要是将2PC的第一阶段分为2个步调,先筹办,再锁定资本,而且引入了超时机造(那也意味着会形成数据纷歧致)。

3PC的三个阶段包罗:CanCommit、PReCommit 和 DoCommit 详细细节就不展开赘述了,就一个核心概念:在CanCommit的时候其实不锁定资本,除非所有参与者都同意了,才起头锁资本。

2.3、TCC柔性事务

比拟较前面的2PC和3PC,TCC和那哥俩的素质区别就是它是营业层面的散布式事务,而2PC和3PC是数据库层面的。

TCC是三个单词的缩写:Try、Confirm、Cancel,也分为那三个流程。

Try:测验考试,即测验考试预留资本,锁定资本

Confirm:确认,即施行预留的资本,若是施行失败会重试

Cancel:打消,撤销预留的资本,若是施行失败会重试

散布式事务处理计划  第5张

从上图可知,TCC关于营业的侵入是很大的,并且紧紧的耦合在一路。TCC比拟较2PC和3PC,试用范畴更广,可实现跨库,跨差别系统去实现散布式事务。缺点是要在营业代码中去开发大量的逻辑实现那三个步调,需要和代码耦合在一路,进步开发成本。

事务日记:在TCC形式中,事务倡议者和事务参与者城市去记录事务日记(事务形态、信息等)。那个事务日记是整个散布式事务呈现不测情况(宕机、重启、收集中断等),实现提交和回滚的关键。

幂等性:在TCC第二阶段,confirm或者cancel的时候,那两个操做都需要包管幂等性。一旦因为收集等原因招致施行失败,就会倡议不竭重试。

防悬挂:因为收集的不成靠性,有异常情况的时候,try恳求可能比cancel恳求更晚抵达。cancel可能会施行空回滚,但是try恳求被施行的时候也不会预留资本。

2.4、Seata

关于seata那里就不多提了,用的最多的是AT形式,上回知了逐渐阐发过,设置装备摆设完后只需要在事务倡议的办法上添加@GlobalTransactional注解就能够开启全局事务,关于营业无侵入,低耦合。感兴趣的话请参考之前讨论Seata的内容。

三、应用场景

知了之前在一家公司碰到过如许的营业场景;用户通过页面投保,提交一笔订单过来,那个订单通过上游办事,处置保单相关的营业逻辑,最初流入下流办事,处置业绩、人员晋升、分润处置等等营业。关于那个场景,两边处置的营业逻辑不在统一个办事中,接入的是差别的数据库。涉及到数据一致性问题,需要用到散布式事务。

关于上面介绍的几种计划,只是讨论了理论和思绪,下面我来总结下那个营业场景中运用的一种实现计划。接纳了当地动静表+MQ异步动静的计划实现了事务最末一致性,也契合其时的营业场景,相对强一致性,实现的性能较高。下面是该计划的思绪图

散布式事务处理计划  第6张

实在营业处置的形态可能会有多种,因而需要明白哪种形态需要按时使命抵偿假设某条单据不断无法处置完毕,按时使命也不克不及无限造下发,所以当地动静表需要增加轮次的概念,重试几次后告警,人工介入处置因为MQ和按时使命的存在,不免会呈现反复恳求,因而下流要做好幂等防重,不然会呈现反复数据,招致数据纷歧致

关于落地实现,话不多说,间接上代码。先定义两张表tb_order和tb_notice_message,别离存订单信息和当地事务信息

CREATE TABLE `tb_order` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id', `user_id` int(11) NOT NULL COMMENT '下单人id', `order_no` varchar(255) CHARACTER SET latin1 NOT NULL COMMENT '订单编号', `insurance_amount` decimal(16,2) NOT NULL COMMENT '保额', `order_amount` decimal(16,2) DEFAULT NULL COMMENT '保费', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `is_delete` tinyint(4) DEFAULT '0' COMMENT '删除标识:0-不删除;1-删除', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;CREATE TABLE `tb_notice_message` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id', `tyPE` tinyint(4) NOT NULL COMMENT '营业类型:1-下单', `status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '形态:1-待处置,2-已处置,3-预警', `data` varchar(255) NOT NULL COMMENT '信息', `retry_count` tinyint(4) DEFAULT '0' COMMENT '重试次数', `create_time` datetime NOT NULL COMMENT '创建时间', `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `is_delete` tinyint(4) NOT NULL DEFAULT '0' COMMENT '删除标识:0-不删除;1-删除', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;

处置订单service,那里能够用到我们之前说过的粉饰器形式,去粉饰那个service。把保留当地事务,发送mq动静,交给粉饰器类去做,而service只需要关心营业逻辑即可,也契合开闭原则。

/** * @author 往事如风 * @version 1.0 * @date 2022/12/13 10:58 * @description */@Service@Slf4j@AllArgsConstructorpublic class OrderService implements BaseHandler<Object, Order> { private final OrderMapper orderMapper; /** * 订单处置办法:只处置订单联系关系逻辑 * @param o * @return */ @Override public Order handle(Object o) { // 订单信息 Order order = Order.builder() .orderNo("2345678") .createTime(LocalDateTime.now()) .userId(1) .insuranceAmount(new BigDecimal(2000000)) .orderAmount(new BigDecimal(5000)) .build(); orderMapper.insert(order); return order; }}

新增OrderService的粉饰类OrderServiceDecorate,负责对订单逻辑的扩展,那里是添加当地事务动静,以及发送MQ信息,扩展办法添加了Transactional注解,确保订单逻辑和当地事务动静的数据在统一个事务中停止,确保原子性。此中事务动静标识表记标帜处置中,待下流办事处置完营业逻辑,再更新处置完成。

/** * @author 往事如风 * @version 1.0 * @date 2022/12/14 18:48 * @description */@Slf4j@AllArgsConstructor@Decorate(scene = SceneConstants.ORDER, type = DecorateConstants.CREATE_ORDER)public class OrderServiceDecorate extends AbstractHandler { private final NoticeMessageMapper noticeMessageMapper; private final RabbitTemplate rabbitTemplate; /** * 粉饰办法:对订单处置逻辑停止扩展 * @param o * @return */ @Override @Transactional public Object handle(Object o) { // 挪用service办法,实现保单逻辑 Order order = (Order) service.handle(o); // 扩展:1、保留事务动静,2、发送MQ动静 // 当地事务动静 String data = "{\"orderNo\":\"2345678\", \"userId\":1, \"insuranceAmount\":2000000, \"orderAmount\":5000}"; NoticeMessage noticeMessage = NoticeMessage.builder() .retryCount(0) .data(data) .status(1) .type(1) .createTime(LocalDateTime.now()) .build(); noticeMessageMapper.insert(noticeMessage); // 发送mq动静 log.info("发送mq动静...."); rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage)); return null; }}

关于那个粉饰者形式,之前有讲到过,能够看下之前发布的内容。

下流办事监听动静,处置完本身的营业逻辑后(如:业绩、分润、晋升等),需要发送MQ,上游办事监听动静,更新当地事务形态为已处置。那需要留意的是下流办事需要做幂等处置,避免异常情况下,上游办事数据的重试。

/** * @author 往事如风 * @version 1.0 * @date 2022/12/13 18:07 * @description */@Component@Slf4j@RabbitListener(queues = "trans.queue")public class FenRunListener { @Autowired private RabbitTemplate rabbitTemplate; @RabbitHandler public void orderHandler(String msg) { log.info("监听到订单动静:{}", msg); // 需要留意幂等,幂等逻辑 log.info("下流办事营业逻辑。。。。。"); JSONObject json = JSONUtil.parSEObj(msg); rabbitTemplate.convertAndSend("trans", "trans.update.order.queue.key", json.getInt("id")); }}

那里插个题外话,关于幂等的处置,我那里大致有两种思绪 1、好比按照订单号查一下记录能否存在,存在就间接返回胜利。 2、redis存一个独一的恳求号,处置完再删除,不存在恳求号的间接返回胜利,能够写个AOP去向理,与营业隔离。 言归正传,上游办事动静监听,下流发送MQ动静,更新当地事务动静为已处置,散布式事务流程完毕。

/** * @author 往事如风 * @version 1.0 * @date 2022/12/13 18:29 * @description */@Component@Slf4j@RabbitListener(queues = "trans.update.order.queue")public class OrderListener { @Autowired private NoticeMessageMapper noticeMessageMapper; @RabbitHandler public void updateOrder(Integer msgId) { log.info("监听动静,更新当地事务动静,动静id:{}", msgId); NoticeMessage msg = NoticeMessage.builder().status(2).id(msgId).updateTime(LocalDateTime.now()).build(); noticeMessageMapper.updateById(msg); }}

存在异常情况时,会通过按时使命,轮询的往MQ中发送动静,尽更大勤奋去让下流办事到达数据一致,当然重试也要设置上限;若到达上限以后还不断是失败,那不能不考虑是下流办事本身存在问题了(有可能就是代码逻辑存在问题)。

/** * @author 往事如风 * @version 1.0 * @date 2022/12/14 10:25 * @description */@Configuration@EnableScheduling@AllArgsConstructor@Slf4jpublic class RetryOrderJob { private final RabbitTemplate rabbitTemplate; private final NoticeMessageMapper noticeMessageMapper; /** * 更大主动重试次数 */ private final Integer MAX_RETRY_COUNT = 5; @Scheduled(cron = "0/20 * * * * ? ") public void retry() { log.info("按时使命,重试异常订单"); LambdaQueryWrapper<NoticeMessage> wrapper = Wrappers.lambdaQuery(NoticeMessage.class); wrapper.eq(NoticeMessage::getStatus, 1); List<NoticeMessage> noticeMessages = noticeMessageMapper.selectList(wrapper); for (NoticeMessage noticeMessage : noticeMessages) { // 从头发送mq动静 rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage)); // 重试次数+1 noticeMessage.setRetryCount(noticeMessage.getRetryCount() + 1); noticeMessageMapper.updateById(noticeMessage); // 判断重试次数,等于最长限造次数,间接更新为报警形态 if (MAX_RETRY_COUNT.equals(noticeMessage.getRetryCount())) { noticeMessage.setStatus(3); noticeMessageMapper.updateById(noticeMessage); // 发送告警,通知对应人员 // 告警逻辑(短信、邮件、企微群,等等).... } } }}

其实那里有个问题,一个上游办事对应多个下流办事的时候。那个时候往往不克不及存一条当地动静记录。

那里能够在动静表多加个字段next_server_count,暗示一个订单倡议方,需要挪用的下流办事数量。上游办事监听的时候,每次会与下流的回调都减去1,曲到数值是0的时候,再更新形态是已处置。但是要控造并发,那个字段是被多个下流办事共享的。还有一种处置计划是为每个下流办事,都记录一条事务动静,用type字段去区分,标识表记标帜类型。实现上游和下流关于事务动静的一对一关系。最初,到达更大重试次数以后,能够将动静参加到一个告警列表,那个告警列表能够展现在办理后台或其他监控系统中,展现一些需要的信息,去供公司内部人员去人工介入,处置那种异常的数据,使得数据到达最末一致性。四、总结

其实散布式事务没有一个完美的处置计划,只能说是尽量去满足营业需求,满够数据一致。若是法式不克不及处置了,最初由人工去兜底,做数据的抵偿计划。

五、参考源码编程文档:https://gitee.com/cicadasmile/butte-java-note应用仓库:https://gitee.com/cicadasmile/butte-flyer-parent散布式事务处理计划  第7张