SpringCloud(Alibaba使用Seata处理分布式事务的技巧)

SpringCloud,Alibaba使用Seata处理分布式事务的技巧

Seata简介

在传统的单体项目中,我们使用@Transactional注解就能实现基本的ACID事务了。
但是前提是:
1) 数据库支持事务(如:MySQL的innoDB引擎)
2) 所有业务都在同一个数据库中执行

随着微服务架构的引入,需要对数据库进行分库分表,每个服务拥有自己的数据库,这样传统的事务就不起作用了,那么我们如何保证多个服务中数据的一致性呢?

这样就出现了分布式事务,而Seata就是为微服务架构而生的一种高性能、易于使用的分布式事务解决方案。

Seata 中有三个基础组件:

    可以这么说一个分布式事务就是全局事务GlobalTransaction,而全局事务是由一个个的分支事务组成的,每个分支事务就是一个本地事务。

    Seata的生命周期

      Seata安装和配置

      安装nacos,本案例使用了nacos作为注册中心
      https://github.com/alibaba/nacos/releases
      下载nacos,本文使用的是windows版本1.4.0
      使用命令行进入bin目录,以单机模式启动nacos

      startup -m standalone

      安装和配置Seata

      http://seata.io/zh-cn/blog/download.html
      下载Seata,这里是Windows版本的1.4.0
      解压后,进入conf目录,配置file.conf和registry.conf两个文件


      file.conf主要是数据库的配置,配置如下


      registry.conf 是注册中心的配置


      另外conf目录中还需要一个脚本文件:nacos-config.sh 用于对nacos进行初始化配置
      在seata1.4.0中是没有的,需要自行创建,内容如下:

      #!/usr/bin/env bash
      # Copyright 1999-2019 Seata.io Group.
      #
      # Licensed under the Apache License, Version 2.0 (the "License");
      # you may not use this file except in compliance with the License.
      # You may obtain a copy of the License at、
      #
      #      http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      
      while getopts ":h:p:g:t:u:w:" opt
      do
        case $opt in
        h)
          host=$OPTARG
          ;;
        p)
          port=$OPTARG
          ;;
        g)
          group=$OPTARG
          ;;
        t)
          tenant=$OPTARG
          ;;
        u)
          username=$OPTARG
          ;;
        w)
          password=$OPTARG
          ;;
        ?)
          echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
          exit 1
          ;;
        esac
      done
      
      urlencode() {
        for ((i=0; i < ${#1}; i++))
        do
          char="${1:$i:1}"
          case $char in
          [a-zA-Z0-9.~_-]) printf $char ;;
          *) printf '%%%02X' "'$char" ;;
          esac
        done
      }
      
      if [[ -z ${host} ]]; then
          host=localhost
      fi
      if [[ -z ${port} ]]; then
          port=8848
      fi
      if [[ -z ${group} ]]; then
          group="SEATA_GROUP"
      fi
      if [[ -z ${tenant} ]]; then
          tenant=""
      fi
      if [[ -z ${username} ]]; then
          username=""
      fi
      if [[ -z ${password} ]]; then
          password=""
      fi
      
      nacosAddr=$host:$port
      contentType="content-type:application/json;charset=UTF-8"
      
      echo "set nacosAddr=$nacosAddr"
      echo "set group=$group"
      
      failCount=0
      tempLog=$(mktemp -u)
      function addConfig() {
        curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$(urlencode $1)&group=$group&content=$(urlencode $2)&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
        if [[ -z $(cat "${tempLog}") ]]; then
          echo " Please check the cluster status. "
          exit 1
        fi
        if [[ $(cat "${tempLog}") =~ "true" ]]; then
          echo "Set $1=$2 successfully "
        else
          echo "Set $1=$2 failure "
          (( failCount++ ))
        fi
      }
      
      count=0
      for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
        (( count++ ))
        key=${line%%=*}
          value=${line#*=}
        addConfig "${key}" "${value}"
      done
      
      echo "========================================================================="
      echo " Complete initialization parameters,  total-count:$count ,  failure-count:$failCount "
      echo "========================================================================="
      
      if [[ ${failCount} -eq 0 ]]; then
        echo " Init nacos config finished, please start seata-server. "
      else
        echo " init nacos config fail. "
      fi

      在seata的根目录,与conf同级的目录下,还需要config.txt 配置文件,默认也是没有的


      只需要对mysql的配置进行修改


      完整文件:

      transport.type=TCP
      transport.server=NIO
      transport.heartbeat=true
      transport.enableClientBatchSendRequest=true
      transport.threadFactory.bossThreadPrefix=NettyBoss
      transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
      transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
      transport.threadFactory.shareBossWorker=false
      transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
      transport.threadFactory.clientSelectorThreadSize=1
      transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
      transport.threadFactory.bossThreadSize=1
      transport.threadFactory.workerThreadSize=default
      transport.shutdown.wait=3
      service.vgroupMapping.my_test_tx_group=default
      service.default.grouplist=127.0.0.1:8091
      service.enableDegrade=false
      service.disableGlobalTransaction=false
      client.rm.asyncCommitBufferLimit=10000
      client.rm.lock.retryInterval=10
      client.rm.lock.retryTimes=30
      client.rm.lock.retryPolicyBranchRollbackOnConflict=true
      client.rm.reportRetryCount=5
      client.rm.tableMetaCheckEnable=false
      client.rm.tableMetaCheckerInterval=60000
      client.rm.sqlParserType=druid
      client.rm.reportSuccessEnable=false
      client.rm.sagaBranchRegisterEnable=false
      client.rm.tccActionInterceptorOrder=-2147482648
      client.tm.commitRetryCount=5
      client.tm.rollbackRetryCount=5
      client.tm.defaultGlobalTransactionTimeout=60000
      client.tm.degradeCheck=false
      client.tm.degradeCheckAllowTimes=10
      client.tm.degradeCheckPeriod=2000
      client.tm.interceptorOrder=-2147482648
      store.mode=file
      store.lock.mode=file
      store.session.mode=file
      store.publicKey=xx
      store.file.dir=file_store/data
      store.file.maxBranchSessionSize=16384
      store.file.maxGlobalSessionSize=512
      store.file.fileWriteBufferCacheSize=16384
      store.file.flushDiskMode=async
      store.file.sessionReloadReadSize=100
      store.db.datasource=druid
      store.db.dbType=mysql
      store.db.driverClassName=com.mysql.jdbc.Driver
      store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
      store.db.user=root
      store.db.password=123456
      store.db.minConn=5
      store.db.maxConn=30
      store.db.globalTable=global_table
      store.db.branchTable=branch_table
      store.db.queryLimit=100
      store.db.lockTable=lock_table
      store.db.maxWait=5000
      store.redis.mode=single
      store.redis.single.host=127.0.0.1
      store.redis.single.port=6379
      store.redis.sentinel.masterName=xx
      store.redis.sentinel.sentinelHosts=xx
      store.redis.maxConn=10
      store.redis.minConn=1
      store.redis.maxTotal=100
      store.redis.database=0
      store.redis.password=xx
      store.redis.queryLimit=100
      server.recovery.committingRetryPeriod=1000
      server.recovery.asynCommittingRetryPeriod=1000
      server.recovery.rollbackingRetryPeriod=1000
      server.recovery.timeoutRetryPeriod=1000
      server.maxCommitRetryTimeout=-1
      server.maxRollbackRetryTimeout=-1
      server.rollbackRetryTimeoutUnlockEnable=false
      server.distributedLockExpireTime=10000
      client.undo.dataValidation=true
      client.undo.logSerialization=jackson
      client.undo.onlyCareUpdateColumns=true
      server.undo.logSaveDays=7
      server.undo.logDeletePeriod=86400000
      client.undo.logTable=undo_log
      client.undo.compress.enable=true
      client.undo.compress.type=zip
      client.undo.compress.threshold=64k
      log.exceptionRate=100
      transport.serialization=seata
      transport.compressor=none
      metrics.enabled=false
      metrics.registryType=compact
      metrics.exporterList=prometheus
      metrics.exporterPrometheusPort=9898

      在conf目录中,使用Git Bash进入命令行,输入

      sh nacos-config.sh 127.0.0.1


      这是对Seata进行初始化配置,上图表示所有配置都成功设置了
      在nacos中可以看到出现了seata相关的配置


      接下来在seata数据库中,新建三个表

      drop table if exists `global_table`;
      create table `global_table` (
        `xid` varchar(128)  not null,
        `transaction_id` bigint,
        `status` tinyint not null,
        `application_id` varchar(32),
        `transaction_service_group` varchar(32),
        `transaction_name` varchar(128),
        `timeout` int,
        `begin_time` bigint,
        `application_data` varchar(2000),
        `gmt_create` datetime,
        `gmt_modified` datetime,
        primary key (`xid`),
        key `idx_gmt_modified_status` (`gmt_modified`, `status`),
        key `idx_transaction_id` (`transaction_id`)
      );
      
      
      drop table if exists `branch_table`;
      create table `branch_table` (
        `branch_id` bigint not null,
        `xid` varchar(128) not null,
        `transaction_id` bigint ,
        `resource_group_id` varchar(32),
        `resource_id` varchar(256) ,
        `lock_key` varchar(128) ,
        `branch_type` varchar(8) ,
        `status` tinyint,
        `client_id` varchar(64),
        `application_data` varchar(2000),
        `gmt_create` datetime,
        `gmt_modified` datetime,
        primary key (`branch_id`),
        key `idx_xid` (`xid`)
      );
      
      
      drop table if exists `lock_table`;
      create table `lock_table` (
        `row_key` varchar(128) not null,
        `xid` varchar(96),
        `transaction_id` long ,
        `branch_id` long,
        `resource_id` varchar(256) ,
        `table_name` varchar(32) ,
        `pk` varchar(36) ,
        `gmt_create` datetime ,
        `gmt_modified` datetime,
        primary key(`row_key`)
      );

      在项目相关的数据库中,新建表undo_log 用于记录撤销日志

      CREATE TABLE `undo_log` (
        `id` bigint(20) NOT NULL AUTO_INCREMENT,
        `branch_id` bigint(20) NOT NULL,
        `xid` varchar(100) NOT NULL,
        `context` varchar(128) NOT NULL,
        `rollback_info` longblob NOT NULL,
        `log_status` int(11) NOT NULL,
        `log_created` datetime NOT NULL,
        `log_modified` datetime NOT NULL,
        `ext` varchar(100) DEFAULT NULL,
        PRIMARY KEY (`id`),
        UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
      ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

      最后在bin目录中,启动命令行,执行seata-server.bat 启动Seata服务

      项目应用Seata

      SpringCloud项目中有两个服务:订单服务和库存服务,基本业务是:

      订单详情表

      DROP TABLE IF EXISTS `tb_order_detail`;
      CREATE TABLE `tb_order_detail` (
        `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '订单详情id ',
        `order_id` bigint(20) NOT NULL COMMENT '订单id',
        `sku_id` bigint(20) NOT NULL COMMENT 'sku商品id',
        `num` int(11) NOT NULL COMMENT '购买数量',
        `title` varchar(256) NOT NULL COMMENT '商品标题',
        `own_spec` varchar(1024) DEFAULT '' COMMENT '商品动态属性键值集',
        `price` bigint(20) NOT NULL COMMENT '价格,单位:分',
        `image` varchar(128) DEFAULT '' COMMENT '商品图片',
        PRIMARY KEY (`id`),
        KEY `key_order_id` (`order_id`) USING BTREE
      ) ENGINE=MyISAM AUTO_INCREMENT=131 DEFAULT CHARSET=utf8 COMMENT='订单详情表';

      库存表

      DROP TABLE IF EXISTS `tb_stock`;
      CREATE TABLE `tb_stock` (
        `sku_id` bigint(20) NOT NULL COMMENT '库存对应的商品sku id',
        `seckill_stock` int(9) DEFAULT '0' COMMENT '可秒杀库存',
        `seckill_total` int(9) DEFAULT '0' COMMENT '秒杀总数量',
        `stock` int(9) NOT NULL COMMENT '库存数量',
        PRIMARY KEY (`sku_id`)
      ) ENGINE=MyISAM DEFAULT CHARSET=utf8 COMMENT='库存表,代表库存,秒杀库存等信息';

      父项目定义了springboot、springcloud、springcloud-alibaba的版本

      
          org.springframework.boot
          spring-boot-starter-parent
          2.3.10.RELEASE
           
      
      
      
          
              
                  org.springframework.cloud
                  spring-cloud-alibaba-dependencies
                  0.9.0.RELEASE
                  pom
                  import
              
              
                  com.alibaba.cloud
                  spring-cloud-alibaba-dependencies
                  2.2.1.RELEASE
                  pom
                  import
              
              
                  org.springframework.cloud
                  spring-cloud-dependencies
                  Hoxton.SR8
                  pom
                  import
              
          
      

      子项目的依赖定义了nacos和seata客户端

      
         mysql
         mysql-connector-java
         runtime
      
      
      
         com.baomidou
         mybatis-plus-boot-starter
         3.3.2
      
      
      
         org.springframework.boot
         spring-boot-starter-web
      
      
      
         org.springframework.cloud
         spring-cloud-starter-alibaba-nacos-discovery
      
      
      
         com.alibaba.cloud
         spring-cloud-starter-alibaba-seata
         
             
                 io.seata
                 seata-spring-boot-starter
             
         
      
      
         io.seata
         seata-spring-boot-starter
         1.2.0
      

      子项目配置文件


      完整配置

      server:
        port: 8001
      spring:
        application:
          name: stock-service
        cloud:
          nacos:
            discovery:
              server-addr: localhost:8848
          alibaba:
            seata:
              enabled: true
              enable-auto-data-source-proxy: true
              tx-service-group: my_test_tx_group
              registry:
                type: nacos
                nacos:
                  application: seata-server
                  server-addr: 127.0.0.1:8848
                  username: nacos
                  password: nacos
              config:
                type: nacos
                nacos:
                  server-addr: 127.0.0.1:8848
                  group: SEATA_GROUP
                  username: nacos
                  password: nacos
              service:
                vgroup-mapping:
                  my_test_tx_group: default
                disable-global-transaction: false
              client:
                rm:
                  report-success-enable: false
        datasource:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://localhost:3306/eshop?serverTimezone=UTC&useUnicode=true&useSSL=false&characterEncoding=utf8
          username: root
          password: 123456

      库存服务定义了减库存的方法

      @RestController
      public class StockController {
      
          @Autowired
          private IStockService stockService;
      
          @PutMapping("/stock")
          public ResponseEntity reduceSkuStock(@RequestParam("skuId")Long skuId,
                                                      @RequestParam("number")Integer number){
              Stock stock = stockService.getById(skuId);
              if(stock.getStock() < number){
                  throw new RuntimeException("库存不足,SkuId:" + skuId);
              }
              stock.setStock(stock.getStock() - number);
              stockService.updateById(stock);
              return ResponseEntity.ok(stock);
          }
      }

      订单服务在插入订单后,使用Feign调用了减库存的服务

      @Service
      public class OrderDetailServiceImpl extends ServiceImpl implements IOrderDetailService {
      
          //库存服务Feign
          @Autowired
          private StockFeignClient stockFeignClient;
      
      //    @Transactional
          @GlobalTransactional(rollbackFor = {Exception.class})
          @Override
          public void makeOrder(OrderDetail orderDetail) {
              this.save(orderDetail); //保存订单
              int x = 11 / 0; //抛出异常
              //减库存
              stockFeignClient.reduceSkuStock(orderDetail.getSkuId(),orderDetail.getNum()); 
          }
      }

      插订单和减库存属于两个服务,传统的@Transactional已经不能保证它们的原子性了
      这里使用了Seata提供的@GlobalTransactional全局事务注解,出现任何异常后都能实现业务回滚。
      测试用例:

      @RunWith(SpringRunner.class)
      @SpringBootTest
      public class OrderServiceApplicationTests {
      
          @Autowired
          private IOrderDetailService orderDetailService;
      
          @Test
          public void testOrder() {
              OrderDetail orderDetail = new OrderDetail();
              orderDetail.setNum(100);
              orderDetail.setOrderId(9999L);
              orderDetail.setPrice(9999L);
              orderDetail.setSkuId(27359021728L);
              orderDetail.setTitle(UUID.randomUUID().toString());
              orderDetailService.makeOrder(orderDetail);
          }
      
      }

      运行后看到启动了全局事务,发生异常后,两个服务也都能成功回滚。

      以上就是SpringCloud Alibaba使用Seata 分布式事务的详细内容,更多关于SpringCloud Alibaba分布式事务的资料请关注趣讯吧其它相关文章!

      发布于 2021-06-20 09:29:26
      分享
      海报
      174
      上一篇:SpringBoot(Admin健康检查功能的实现) 下一篇:宁德时代否认强制员工买Model3(鼓励买电动车)
      目录

        忘记密码?

        图形验证码