- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
分布式事务系列文章 分布式事务 | 使用DTM 的Saga 模式 分布式事务 | 使用 dotnetcore/CAP 的本地消息表模式 分布式事务 | 基于MassTransit的StateMachine实现Saga编排式分布式事务 分布式事务 | 基于MassTransit Courier实现Saga 编排式分布式事务 。
前面章节提及的 MassTransit 、 dotnetcore/CAP 都提供了分布式事务的处理能力,但也仅局限于Saga和本地消息表模式的实现。那有没有一个独立的分布式事务解决方案,涵盖多种分布式事务处理模式,如 Saga 、 TCC 、 XA 模式等。有,目前业界主要有两种开源方案,其一是阿里开源的 Seata ,另一个就是 DTM 。其中 Seata 仅支持Java、Go和Python语言,因此不在.NET 的选择范围。 DTM 则通过提供简单易用的HTTP和gRPC接口,屏蔽了语言的无关性,因此支持任何开发语言接入,目前提供了Go、Python、NodeJs、Ruby、Java和C#等语言的SDK。 DTM,全称Distributed Transaction Manager,是一个分布式事务管理器,解决跨数据库、跨服务、跨语言更新数据的一致性问题。它提供了Saga、TCC、 XA和二阶段消息模式以满足不同应用场景的需求,同时其首创的 子事务屏障 技术可以有效解决幂等、悬挂和空补偿等异常问题.
那DTM是如何处理分布式事务的呢?以一个经典的跨行转账业务为例来看下事务处理过程。对于跨行转账业务而言,很显然是跨库跨服务的应用场景,不能简单通过本地事务解决,可以使用Saga模式,以下是基于DTM提供的Saga事务模式成功转账的的时序图:
从以上时序图可以看出,DTM整个全局事务分为如下几步:
基于以上这个时序图的基础上,再来看下DTM的架构:
整个DTM架构中,一共有三个角色,分别承担了不同的职责:
总体而言,AP-应用程序充当全局事务编排器的角色通过DTM提供的开箱即用的SDK进行全局事务和子事务的注册。TM-事务管理器接收到注册的全局事务和子事务后,负责调用RM-资源管理器来执行对应的事务分支,TM-事务管理器根据事务分支的执行结果决定是否提及或回滚事务.
百闻不如一见,接下来就来实际上手体验下如何基于DTM来实际应用Saga进行分布式跨行转账事务的处理.
接下来就来创建一个示例项目:
dotnet new webapi -n DtmDemo.Webapi
创建示例项目。 Dtmcli
和 Pomelo.EntityFrameworkCore.MySql
。
{
"dtm": {
"DtmUrl": "http://localhost:36789",
"DtmTimeout": 10000,
"BranchTimeout": 10000,
"DBType": "mysql",
"BarrierTableName": "dtm_barrier.barrier",
}
}
BankAccount
实体类:
namespace DtmDemo.WebApi.Models
{
public class BankAccount
{
public int Id { get; set; }
public decimal Balance { get; set; }
}
}
DtmDemoWebApiContext
数据库上下文:
using Microsoft.EntityFrameworkCore;
namespace DtmDemo.WebApi.Data
{
public class DtmDemoWebApiContext : DbContext
{
public DtmDemoWebApiContext (DbContextOptions<DtmDemoWebApiContext> options)
: base(options)
{
}
public DbSet<DtmDemo.WebApi.Models.BankAccount> BankAccount { get; set; } = default!;
}
}
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using Dtmcli;
var builder = WebApplication.CreateBuilder(args);
var connectionStr = builder.Configuration.GetConnectionString("DtmDemoWebApiContext");
// 注册DbContext
builder.Services.AddDbContext<DtmDemoWebApiContext>(options =>
{
options.UseMySql(connectionStr, ServerVersion.AutoDetect(connectionStr));
});
// 注册DTM
builder.Services.AddDtmcli(builder.Configuration, "dtm");
dotnet ef migrations add 'Initial'
创建迁移。 BankAccountController
如下,其中 PostBankAccount
接口添加了 await _context.Database.MigrateAsync();
用于自动应用迁移。
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;
namespace DtmDemo.WebApi.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class BankAccountsController : ControllerBase
{
private readonly DtmDemoWebApiContext _context;
public BankAccountsController(DtmDemoWebApiContext context)
{
_context = context;
}
[HttpGet]
public async Task<ActionResult<IEnumerable<BankAccount>>> GetBankAccount()
{
return await _context.BankAccount.ToListAsync();
}
[HttpPost]
public async Task<ActionResult<BankAccount>> PostBankAccount(BankAccount bankAccount)
{
await _context.Database.MigrateAsync();
_context.BankAccount.Add(bankAccount);
await _context.SaveChangesAsync();
return Ok(bankAccount);
}
}
接下来定义 SagaDemoController 来使用DTM的Saga模式来模拟跨行转账分布式事务:
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using DtmDemo.WebApi.Data;
using DtmDemo.WebApi.Models;
using Dtmcli;
using DtmCommon;
namespace DtmDemo.WebApi.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class SagaDemoController : ControllerBase
{
private readonly DtmDemoWebApiContext _context;
private readonly IConfiguration _configuration;
private readonly IDtmClient _dtmClient;
private readonly IDtmTransFactory _transFactory;
private readonly IBranchBarrierFactory _barrierFactory;
private readonly ILogger<BankAccountsController> _logger;
public SagaDemoController(DtmDemoWebApiContext context, IConfiguration configuration, IDtmClient dtmClient, IDtmTransFactory transFactory, ILogger<BankAccountsController> logger, IBranchBarrierFactory barrierFactory)
{
this._context = context;
this._configuration = configuration;
this._dtmClient = dtmClient;
this._transFactory = transFactory;
this._logger = logger;
this._barrierFactory = barrierFactory;
}
}
对于跨行转账业务,使用DTM的Saga模式,首先要进行事务拆分,可以拆分为以下4个子事务,并分别实现:
[HttpPost("TransferOut")]
public async Task<IActionResult> TransferOut([FromBody] TransferRequest request)
{
var msg = $"用户{request.UserId}转出{request.Amount}元";
_logger.LogInformation($"转出子事务-启动:{msg}");
// 1. 创建子事务屏障
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
try
{
using (var conn = _context.Database.GetDbConnection())
{
// 2. 在子事务屏障内执行事务操作
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"转出子事务-执行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null || bankAccount.Balance < request.Amount)
throw new InvalidDataException("账户不存在或余额不足!");
bankAccount.Balance -= request.Amount;
await _context.SaveChangesAsync();
});
}
}
catch (InvalidDataException ex)
{
_logger.LogInformation($"转出子事务-失败:{ex.Message}");
// 3. 按照接口协议,返回409,以表示子事务失败
return new StatusCodeResult(StatusCodes.Status409Conflict);
}
_logger.LogInformation($"转出子事务-成功:{msg}");
return Ok();
}
以上代码中有几点需要额外注意:
_barrierFactory.CreateBranchBarrier(Request.Query)
,其中 Request.Query
中的参数由DTM 生成,类似: ?branch_id=01&gid=XTzKHgxemLyL8EXtMTLvzK&op=action&trans_type=saga
,主要包含四个参数:
branchBarrier.Call(conn, async (tx) =>{}
**409**
状态码以告知DTM 子事务失败。 409
状态码。在外围捕获异常时切忌放大异常捕获,比如直接 catch(Exception)
,如此会捕获由于网络等其他原因导致的异常,而导致DTM 不再自动处理该异常,比如业务异常时的自动重试。 转出补偿,就是回滚转出操作,进行账户余额归还,实现如下:
[HttpPost("TransferOut_Compensate")]
public async Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request)
{
var msg = $"用户{request.UserId}回滚转出{request.Amount}元";
_logger.LogInformation($"转出补偿子事务-启动:{msg}");
// 1. 创建子事务屏障
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
using (var conn = _context.Database.GetDbConnection())
{
// 在子事务屏障内执行事务操作
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"转出补偿子事务-执行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null)
return; //对于补偿操作,可直接返回,中断后续操作
bankAccount.Balance += request.Amount;
await _context.SaveChangesAsync();
});
}
_logger.LogInformation($"转出补偿子事务-成功!");
// 2. 因补偿操作必须成功,所以必须返回200。
return Ok();
}
由于DTM设计为总是执行补偿,也就是说即使正向操作子事务失败时,DTM 仍旧会执行补偿逻辑。但子事务屏障会在执行时判断正向操作的执行状态,当子事务失败时,并不会执行补偿逻辑。 另外DTM的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功 。 因此在补偿子事务中,即使补偿子事务中出现业务失败时,也必须返回 **200** 。 因此当出现 bankAccount==null 时可以直接 return.
转入子事务和转出子事务的实现基本类似,都是开启子事务屏障后,在 branchBarrier.Call(conn, async tx => {} 中实现事务逻辑,并通过抛异常的方式并最终返回 409 状态码来显式告知DTM 子事务执行失败.
[HttpPost("TransferIn")]
public async Task<IActionResult> TransferIn([FromBody] TransferRequest request)
{
var msg = $"用户{request.UserId}转入{request.Amount}元";
_logger.LogInformation($"转入子事务-启动:{msg}");
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
try
{
using (var conn = _context.Database.GetDbConnection())
{
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"转入子事务-执行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null)
throw new InvalidDataException("账户不存在!");
bankAccount.Balance += request.Amount;
await _context.SaveChangesAsync();
});
}
}
catch (InvalidDataException ex)
{
_logger.LogInformation($"转入子事务-失败:{ex.Message}");
return new StatusCodeResult(StatusCodes.Status409Conflict);
}
_logger.LogInformation($"转入子事务-成功:{msg}");
return Ok();
}
转入补偿子事务和转出补偿子事务的实现也基本类似,都是开启子事务屏障后,在 branchBarrier.Call(conn, async tx => {} 中实现事务逻辑,并最终返回 200 状态码来告知DTM 补偿子事务执行成功.
[HttpPost("TransferIn_Compensate")]
public async Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request)
{
var msg = "用户{request.UserId}回滚转入{request.Amount}元";
_logger.LogInformation($"转入补偿子事务-启动:{msg}");
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
using (var conn = _context.Database.GetDbConnection())
{
await branchBarrier.Call(conn, async (tx) =>
{
_logger.LogInformation($"转入补偿子事务-执行:{msg}");
await _context.Database.UseTransactionAsync(tx);
var bankAccount = await _context.BankAccount.FindAsync(request.UserId);
if (bankAccount == null) return;
bankAccount.Balance -= request.Amount;
await _context.SaveChangesAsync();
});
}
_logger.LogInformation($"转入补偿子事务-成功!");
return Ok();
}
拆分完子事务,最后就可以进行Saga事务编排了,其代码如下所示:
[HttpPost("Transfer")]
public async Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount,
CancellationToken cancellationToken)
{
try
{
_logger.LogInformation($"转账事务-启动:用户{fromUserId}转账{amount}元到用户{toUserId}");
//1. 生成全局事务ID
var gid = await _dtmClient.GenGid(cancellationToken);
var bizUrl = _configuration.GetValue<string>("TransferBaseURL");
//2. 创建Saga
var saga = _transFactory.NewSaga(gid);
//3. 添加子事务
saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate",
new TransferRequest(fromUserId, amount))
.Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate",
new TransferRequest(toUserId, amount))
.EnableWaitResult(); // 4. 按需启用是否等待事务执行结果
//5. 提交Saga事务
await saga.Submit(cancellationToken);
}
catch (DtmException ex) // 6. 如果开启了`EnableWaitResult()`,则可通过捕获异常的方式,捕获事务失败的结果。
{
_logger.LogError($"转账事务-失败:用户{fromUserId}转账{amount}元到用户{toUserId}失败!");
return new BadRequestObjectResult($"转账失败:{ex.Message}");
}
_logger.LogError($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
return Ok($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!");
}
主要步骤如下:
var gid =await _dtmClient.GenGid(cancellationToken);
_transFactory.NewSaga(gid);
saga.Add(string action, string compensate, object postData);
包含正向和反向子事务。 EnableWaitResult()
开启事务结果等待。 saga.Submit(cancellationToken);
try...catch..
来捕获 DtmExcepiton
异常来获取事务执行异常信息。 既然DTM作为一个独立的服务存在,其负责通过 HTTP 或 gRPC 协议发起子事务的调用,因此首先需要启动一个DTM实例,又由于本项目依赖MySQL,因此我们采用Docker Compose的方式来启动项目。在Visual Studio中通过 右键项目->Add->Docker Support->Linux 即可添加 Dockerfile 如下所示:
FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443
FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
WORKDIR /src
COPY ["DtmDemo.WebApi/DtmDemo.WebApi.csproj", "DtmDemo.WebApi/"]
RUN dotnet restore "DtmDemo.WebApi/DtmDemo.WebApi.csproj"
COPY . .
WORKDIR "/src/DtmDemo.WebApi"
RUN dotnet build "DtmDemo.WebApi.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "DtmDemo.WebApi.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "DtmDemo.WebApi.dll"]
在Visual Studio中通过 右键项目->Add Container Orchestrator Support->Docker Compose 即可添加 docker-compose.yml ,由于整个项目依赖 mysql 和 DTM ,修改 docker-compose.yml 如下所示,其中定义了三个服务:db,dtm和dtmdemo.webapi.
version: '3.4'
services:
db:
image: 'mysql:5.7'
container_name: dtm-mysql
environment:
MYSQL_ROOT_PASSWORD: 123456 # 指定MySQL初始密码
volumes:
- ./docker/mysql/scripts:/docker-entrypoint-initdb.d # 挂载用于初始化数据库的脚本
ports:
- '3306:3306'
dtm:
depends_on: ["db"]
image: 'yedf/dtm:latest'
container_name: dtm-svc
environment:
IS_DOCKER: '1'
STORE_DRIVER: mysql # 指定使用MySQL持久化DTM事务数据
STORE_HOST: db # 指定MySQL服务名,这里是db
STORE_USER: root
STORE_PASSWORD: '123456'
STORE_PORT: 3306
STORE_DB: "dtm" # 指定DTM 数据库名
ports:
- '36789:36789' # DTM HTTP 端口
- '36790:36790' # DTM gRPC 端口
dtmdemo.webapi:
depends_on: ["dtm", "db"]
image: ${DOCKER_REGISTRY-}dtmdemowebapi
environment:
ASPNETCORE_ENVIRONMENT: docker # 设定启动环境为docker
container_name: dtm-webapi-demo
build:
context: .
dockerfile: DtmDemo.WebApi/Dockerfile
ports:
- '31293:80' # 映射Demo:80端口到本地31293端口
- '31294:443' # 映射Demo:443端口到本地31294端口
其中 dtmdemo.webapi 服务通过 ASPNETCORE_ENVIRONMENT: docker 指定启动环境为 docker ,因此需要在项目下添加 appsettings.docker.json 以配置应用参数:
{
"ConnectionStrings": {
"DtmDemoWebApiContext": "Server=db;port=3306;database=dtm_barrier;user id=root;password=123456;AllowLoadLocalInfile=true"
},
"TransferBaseURL": "http://dtmdemo.webapi/api/SagaDemo",
"dtm": {
"DtmUrl": "http://dtm:36789",
"DtmTimeout": 10000,
"BranchTimeout": 10000,
"DBType": "mysql",
"BarrierTableName": "dtm_barrier.barrier"
}
}
另外 db 服务中通过 volumes: ["./docker/mysql/scripts:/docker-entrypoint-initdb.d"] 来挂载初始化脚本,以创建DTM依赖的MySQL 存储数据库 dtm 和示例项目使用子事务屏障需要的 barrier 数据表。脚本如下:
CREATE DATABASE IF NOT EXISTS dtm
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table IF EXISTS dtm.trans_global;
CREATE TABLE if not EXISTS dtm.trans_global (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT 'global transaction id',
`trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg',
`status` varchar(12) NOT NULL COMMENT 'tranaction status: prepared | submitted | aborting | finished | rollbacked',
`query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for msg|workflow',
`protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`options` varchar(1024) DEFAULT 'options for transaction like: TimeoutToFail, RequestTimeout',
`custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction',
`next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job',
`next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job',
`owner` varchar(128) not null default '' comment 'who is locking this trans',
`ext_data` TEXT comment 'result for this trans. currently used in workflow pattern',
`result` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
`rollback_reason` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',
PRIMARY KEY (`id`),
UNIQUE KEY `gid` (`gid`),
key `owner`(`owner`),
key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans'
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
drop table IF EXISTS dtm.trans_branch_op;
CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`gid` varchar(128) NOT NULL COMMENT 'global transaction id',
`url` varchar(1024) NOT NULL COMMENT 'the url of this op',
`data` TEXT COMMENT 'request body, depreceated',
`bin_data` BLOB COMMENT 'request body',
`branch_id` VARCHAR(128) NOT NULL COMMENT 'transaction branch ID',
`op` varchar(45) NOT NULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel',
`status` varchar(45) NOT NULL COMMENT 'transaction op status: prepared | succeed | failed',
`finish_time` datetime DEFAULT NULL,
`rollback_time` datetime DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
drop table IF EXISTS dtm.kv;
CREATE TABLE IF NOT EXISTS dtm.kv (
`id` bigint(22) NOT NULL AUTO_INCREMENT,
`cat` varchar(45) NOT NULL COMMENT 'the category of this data',
`k` varchar(128) NOT NULL,
`v` TEXT,
`version` bigint(22) default 1 COMMENT 'version of the value',
create_time datetime default NULL,
update_time datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE key `uniq_k`(`cat`, `k`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
id bigint(22) PRIMARY KEY AUTO_INCREMENT,
trans_type varchar(45) default '',
gid varchar(128) default '',
branch_id varchar(128) default '',
op varchar(45) default '',
barrier_id varchar(45) default '',
reason varchar(45) default '' comment 'the branch type who insert this record',
create_time datetime DEFAULT now(),
update_time datetime DEFAULT now(),
key(create_time),
key(update_time),
UNIQUE key(gid, branch_id, op, barrier_id)
);
准备完毕,即可通过点击Visual Studio工具栏的 Docker Compose 的启动按钮,启动后可以在 Containers 窗口看到启动了 dtm-mysql 、 dtm-svc 和 dtm-webapi-demo 三个容器,并在浏览器中打开了 http://localhost:31293/swagger/index.html Swagger 网页。该种方式启动项目是支持断点调试项目,如下图所示:
通过 BankAccouts 控制器的 POST 接口,初始化用户1和用户2各100元。再通过 SagaDemo 控制器的 /api/Transfer 接口,进行Saga事务测试.
由于用户1和用户2已存在,且用户1余额足够, 因此该笔转账合法因此会成功,其执行路径为:转出(成功)->转入(成功)-> 事务完成,执行日志如下图所示:
由于用户3不存在,因此执行路径为:转出(失败)->转出补偿(成功)->事务完成。从下图的执行日志可以看出,转出子事务失败,还是会调用对应的转出补偿操作,但子事务屏障会过进行过滤,因此实际上并不会执行真正的转出补偿逻辑,其中红线框住的部分就是证明.
由于用户3不存在,因此执行路径为:转出(成功)->转入(失败)->转入补偿(成功)->转出补偿(成功)->事务完成。从下图的执行日志可以看出,转入子事务失败,还是会调用对应的转入补偿操作,但子事务屏障会过进行过滤,因此实际上并不会执行真正的转入补偿逻辑,其中红线框住的部分就是证明.
在以上的示例中,重复提及子事务屏障,那子事务屏障具体是什么,这里有必要重点说明下。以上面用户1转账10元到用户3为例,整个事务流转过程中,即转出(成功)->转入(失败)->转入补偿(成功)->转出补偿(成功)->事务完成。 在提交事务之后,首先是全局事务的落库,主要由DTM 服务负责,主要包括两张表: trans_global 和 trans_branch_op ,DTM 依此进行子事务分支的协调。其中 trans_global 会插入一条全局事务记录,用于记录全局事务的状态信息,如下图1所示。 trans_branch_op 表为 trans_global 的子表,记录四条子事务分支数据,如下图2所示:
具体的服务再接收到来自Dtm的子事务分支调用时,每次都会往子事务屏障表 barrier 中插入一条数据,如下图所示。业务服务就是依赖此表来完成子事务的控制.
而子事务屏障的核心就是子事务屏障表唯一键的设计,以 gid 、 branch_id 、 op 和 barrier_id 为唯一索引,利用唯一索引,“以改代查”来避免竞态条件。在跨行转账的 Saga 示例中,子事务分支的执行步骤如下所示:
inster ignore into barrier(trans_type, gid, branch_id, op, barrier_id, reason)
向子事务屏障表插入一条数据,有几种情况:
每个子事务分支通过以上步骤,即可实现下图的效果:
本文主要介绍了DTM的Saga模式的应用,基于DTM 首创的子事务屏障技术,使得开发者基于DTM 提供的SDK能够轻松开发出更可靠的分布式应用,彻底将开发人员从网络异常的处理中解放出来,再也不用担心空补偿、防悬挂、幂等等分布式问题。如果要进行分布式事务框架的选型,DTM 将是不二之选.
最后此篇关于分布式事务|使用DTM的Saga模式的文章就讲到这里了,如果你想了解更多关于分布式事务|使用DTM的Saga模式的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在使用 PostgREST 将数据库实体暴露给使用这些实体的 Springboot 应用。 我的数据库中有两个实体,分别是 Person 和 City。 我想同时保存 Person 实体和 Cit
1、事务的定义 Redis的事务提供了一种“将多个命令打包, 然后一次性、按顺序地执行”的机制。 redis事务的主要作用就是串联多个命令防止别的命令插队。 但是,事务并不具有传统
SQLite 事务(Transaction) 事务(Transaction)是一个对数据库执行工作单元。事务(Transaction)是以逻辑顺序完成的工作单位或序列,可以是由用户手动操作完成,也可
事务是顺序组操作。 它们作为单个单元运行,并且直到组中的所有操作都成功执行时才终止。 组中的单个故障会导致整个事务失败,并导致对数据库没有影响。 事务符合ACID(原子性,一致性,隔离和耐久性)
我希望将 SqlKata 用于一个项目。但是,项目标准的一部分是查询应该能够作为事务执行。有没有一种方法可以使用 MSSQL 事务执行一个查询或多个查询? 非常感谢。 最佳答案 SQLKata 使用
我只是以多线程方式测试 PetaPoco 事务... 我有一个简单的测试用例: -- 简单的值对象称之为 MediaDevice -- 插入一条记录,更新1000次 void TransactionT
我正在尝试从 Excel VBA 向 SQL 中插入一些数据。 SQL 命令是在 VBA 脚本的过程中构建的,包括使用一些 SQL 变量。 我试图了解事务在 VBA 中是如何工作的,以及它们是否可以处
情况如下: 一个大型生产客户端/服务器系统,其中一个中央数据库表具有某个列,该列的默认值是 NULL,但现在默认值是 0。但是在该更改之前创建的所有行当然仍然具有 null 值,这会在该系统中生成许多
数据库事务是一个熟悉的概念。 try { ... .. updateDB() .. ... commit(); } catch error { rollback(); }
我想了解使用传播支持进行 Spring 交易的用途。 java 文档提到如果具有 @Transactional(propagation = Propagation.SUPPORTS) 的方法从支持该事
我需要获取 hibernate 的事务 ID。对于每笔交易,此 ID 必须是唯一的。我尝试使用 session.getTransaction().hashCode(),但我相信这个值不是唯一的。 最佳
我从 firebase 收到以下消息:runTransactionBlock:启用持久性时检测到的使用情况。请注意,事务不会在应用重新启动后保留。 那么应用程序重新启动后到底会发生什么?由于主数据库的
我需要在 jdbc 中执行选择、更新、插入查询的序列。 这是我的代码: public String editRequest(){ connection = DatabaseUtil.getServi
Java 是否提供了一种智能“聚合”事务的方法?如果我有多个异构数据存储库,我想保持同步(即用于数据的 Postgres、用于图表的 Neo4j 以及用于索引的 Lucene),是否有一个范例仅允许
我对标题中的主题有几个问题。首先,假设我们使用 JDBC,并且有 2 个事务 T1 和 T2。在 T1 中,我们在一个特定的行上执行 select 语句。然后我们对该行执行更新。在事务 T2 中,我们
我有一个 Python CGI 处理支付交易。当用户提交表单时,CGI 被调用。提交后,CGI 需要一段时间才能执行信用卡交易。在此期间,用户可能会按下 ESC 或刷新按钮。这样做不会“杀死”CGI,
我有一个代码,类似这样 def many_objects_saving(list_of_objects): for some_object in list_of_objects:
我有一个包含 100,000 条记录的表。我正在考虑使用事务来更新数据。将有一个查询将一列更新为零,并且大约有 5000 个更新,每个更新将更新一条记录。 这些大型事务对内存有何影响?事务运行时选择数
有没有办法在一个命令中执行 SQL 事务?例如 mysql_query(" START TRANSACTION; INSERT INTO table1 ....etc; INSERT INTO tab
真心希望能帮到你! 我使用以下函数在 PHP/MySql 应用程序中发送消息: public function sendMail($sender_id, $recipient_id, $subject
我是一名优秀的程序员,十分优秀!