- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
状态机作为一种程序开发范例,在实际的应用开发中有很多的应用场景,其中.NET 中的async/await 的核心底层实现就是基于状态机机制。状态机分为两种:有限状态机和无限状态机,本文介绍的就是有限状态机,有限状态机在任何时候都可以准确地处于有限状态中的一种,其可以根据一些输入从一个状态转换到另一个状态。一个有限状态机是由其状态列表、初始状态和触发每个转换的输入来定义的。如下图展示的就是一个闸机的状态机示意图:
从上图可以看出,状态机主要有以下核心概念:
在.NET中, dotnet-state-machine/stateless 和 MassTransit 都提供了开箱即用的状态机实现。本文将重点介绍 MassTransit 中的状态机在Saga 模式中的应用.
在MassTransit 中 MassTransitStateMachine 就是状态机的具体抽象,可以用其编排一系列事件来实现状态的流转,也可以用来实现Saga模式的分布式事务。并支持与EF Core和Dapper集成将状态持久化到关系型数据库,也支持将状态持久化到MongoDB、Redis等数据库。是以简单的下单流程:创建订单->扣减库存->支付订单举例而言,其示意图如下所示.
那具体如何使用 MassTransitStateMachine 来应用编排式Saga 模式呢,接下来就来创建解决方案来实现以上下单流程示例。依次创建以下项目,除共享类库项目外,均安装 MassTransit 和 MassTransit.RabbitMQ NuGet包.
项目 | 项目名 | 项目类型 |
---|---|---|
订单服务 | MassTransit.SmDemo.OrderService | ASP.NET Core Web API |
库存服务 | MassTransit.SmDemo.InventoryService | Worker Service |
支付服务 | MassTransit.SmDemo.PaymentService | Worker Service |
共享类库 | MassTransit.SmDemo.Shared | Class Library |
三个服务都添加扩展类 MassTransitServiceExtensions ,并在 Program.cs 类中调用 services.AddMassTransitWithRabbitMq(); 注册服务.
using System.Reflection;
using MassTransit.CourierDemo.Shared.Models;
namespace MassTransit.CourierDemo.InventoryService;
public static class MassTransitServiceExtensions
{
public static IServiceCollection AddMassTransitWithRabbitMq(this IServiceCollection services)
{
return services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
// By default, sagas are in-memory, but should be changed to a durable
// saga repository.
x.SetInMemorySagaRepositoryProvider();
var entryAssembly = Assembly.GetEntryAssembly();
x.AddConsumers(entryAssembly);
x.AddSagaStateMachines(entryAssembly);
x.AddSagas(entryAssembly);
x.AddActivities(entryAssembly);
x.UsingRabbitMq((context, busConfig) =>
{
busConfig.Host(
host: "localhost",
port: 5672,
virtualHost: "masstransit",
configure: hostConfig =>
{
hostConfig.Username("guest");
hostConfig.Password("guest");
});
busConfig.ConfigureEndpoints(context);
});
});
}
}
订单服务作为下单流程中的核心服务,主要职责包含接收创建订单请求和订单状态机的实现。先来定义 OrderController 如下:
namespace MassTransit.SmDemo.OrderService.Controllers;
[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
private readonly IBus _bus;
public OrderController(IBus bus)
{
_bus = bus;
}
[HttpPost]
public async Task<IActionResult> CreateOrder(CreateOrderDto createOrderDto)
{
await _bus.Publish<ICreateOrderCommand>(new
{
createOrderDto.CustomerId,
createOrderDto.ShoppingCartItems
});
return Ok();
}
}
紧接着,订阅 ICreateOrderCommand ,执行订单创建逻辑,订单创建完毕后会发布 ICreateOrderSucceed 事件.
public class CreateOrderConsumer : IConsumer<ICreateOrderCommand>
{
private readonly ILogger<CreateOrderConsumer> _logger;
public CreateOrderConsumer(ILogger<CreateOrderConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<ICreateOrderCommand> context)
{
var shoppingItems =
context.Message.ShoppingCartItems.Select(item => new ShoppingCartItem(item.SkuId, item.Price, item.Qty));
var order = new Order(context.Message.CustomerId).NewOrder(shoppingItems.ToArray());
await OrderRepository.Insert(order);
_logger.LogInformation($"Order {order.OrderId} created successfully");
await context.Publish<ICreateOrderSucceed>(new
{
order.OrderId,
order.OrderItems
});
}
}
最后来实现订单状态机,主要包含以下几步:
OrderState
以保存状态机实例状态数据:
using MassTransit.SmDemo.OrderService.Domains;
namespace MassTransit.SmDemo.OrderService;
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public Guid OrderId { get; set; }
public decimal Amount { get; set; }
public List<OrderItem> OrderItems { get; set; }
}
MassTransitStateMachine
并同时指定状态实例即可:
namespace MassTransit.SmDemo.OrderService;
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
}
return services.AddMassTransit(x =>
{
//...
x.AddSagaStateMachine<OrderStateMachine, OrderState>()
.InMemoryRepository();
}
State
类型定义,本例中为:
public State Created { get; private set; }
public State InventoryDeducted { get; private set; }
public State Paid { get; private set; }
public State Canceled { get; private set; }
Event<T>
类型定义,本例涉及有:
public Event<ICreateOrderSucceed> OrderCreated {get; private set;}
public Event<IDeduceInventorySucceed> DeduceInventorySucceed {get; private set;}
public Event<IDeduceInventoryFailed> DeduceInventoryFailed {get; private set;}
public Event<IPayOrderSucceed> PayOrderSucceed {get; private set;}
public Event<IPayOrderFailed> PayOrderFailed {get; private set;}
public Event<IReturnInventorySucceed> ReturnInventorySucceed { get; private set; }
public Event<ICancelOrderSucceed> OrderCanceled { get; private set; }
关联Id
。以下就是将事件消息中的传递的 OrderId
作为关联ID。
Event(() => OrderCreated, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => DeduceInventorySucceed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => DeduceInventoryFailed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => PayOrderSucceed, x => x.CorrelateById(m => m.Message.OrderId));
(1) 初始状态->已创建:触发条件为 OrderCreated 事件,同时要发送 IDeduceInventoryCommand 推动库存服务执行库存扣减.
Initially(
When(OrderCreated)
.Then(context =>
{
context.Saga.OrderId = context.Message.OrderId;
context.Saga.OrderItems = context.Message.OrderItems;
context.Saga.Amount = context.Message.OrderItems.Sum(x => x.Price * x.Qty);
})
.PublishAsync(context => context.Init<IDeduceInventoryCommand>(new
{
context.Saga.OrderId,
DeduceInventoryItems =
context.Saga.OrderItems.Select(x => new DeduceInventoryItem(x.SkuId, x.Qty)).ToList()
}))
.TransitionTo(Created));
(2) 已创建-> 库存已扣减:触发条件为 DeduceInventorySucceed 事件,同时要发送 IPayOrderCommand 推动支付服务执行订单支付.
During(Created,
When(DeduceInventorySucceed)
.Then(context =>
{
context.Publish<IPayOrderCommand>(new
{
context.Saga.OrderId,
context.Saga.Amount
});
}).TransitionTo(InventoryDeducted),
When(DeduceInventoryFailed).Then(context =>
{
context.Publish<ICancelOrderCommand>(new
{
context.Saga.OrderId
});
})
);
(3) 库存已扣减->已支付:触发条件为 PayOrderSucceed 事件,转移到已支付后,流程结束.
During(InventoryDeducted,
When(PayOrderFailed).Then(context =>
{
context.Publish<IReturnInventoryCommand>(new
{
context.Message.OrderId,
ReturnInventoryItems =
context.Saga.OrderItems.Select(x => new ReturnInventoryItem(x.SkuId, x.Qty)).ToList()
});
}),
When(PayOrderSucceed).TransitionTo(Paid).Then(context => context.SetCompleted()));
最终完整版的 OrderStateMachine 如下所示:
using MassTransit.SmDemo.OrderService.Events;
using MassTransit.SmDemo.Shared.Contracts;
namespace MassTransit.SmDemo.OrderService;
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Created { get; private set; }
public State InventoryDeducted { get; private set; }
public State Paid { get; private set; }
public State Canceled { get; private set; }
public Event<ICreateOrderSucceed> OrderCreated { get; private set; }
public Event<IDeduceInventorySucceed> DeduceInventorySucceed { get; private set; }
public Event<IDeduceInventoryFailed> DeduceInventoryFailed { get; private set; }
public Event<ICancelOrderSucceed> OrderCanceled { get; private set; }
public Event<IPayOrderSucceed> PayOrderSucceed { get; private set; }
public Event<IPayOrderFailed> PayOrderFailed { get; private set; }
public Event<IReturnInventorySucceed> ReturnInventorySucceed { get; private set; }
public Event<IOrderStateRequest> OrderStateRequested { get; private set; }
public OrderStateMachine()
{
Event(() => OrderCreated, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => DeduceInventorySucceed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => DeduceInventoryFailed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => ReturnInventorySucceed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => PayOrderSucceed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => PayOrderFailed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => OrderCanceled, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => OrderStateRequested, x =>
{
x.CorrelateById(m => m.Message.OrderId);
x.OnMissingInstance(m =>
{
return m.ExecuteAsync(x => x.RespondAsync<IOrderNotFoundOrCompleted>(new { x.Message.OrderId }));
});
});
InstanceState(x => x.CurrentState);
Initially(
When(OrderCreated)
.Then(context =>
{
context.Saga.OrderId = context.Message.OrderId;
context.Saga.OrderItems = context.Message.OrderItems;
var amount = context.Message.OrderItems.Sum(x => x.Price * x.Qty);
context.Saga.Amount = amount;
})
.PublishAsync(context => context.Init<IDeduceInventoryCommand>(new
{
context.Saga.OrderId,
DeduceInventoryItems =
context.Saga.OrderItems.Select(x => new DeduceInventoryItem(x.SkuId, x.Qty)).ToList()
}))
.TransitionTo(Created));
During(Created,
When(DeduceInventorySucceed)
.Then(context =>
{
context.Publish<IPayOrderCommand>(new
{
context.Saga.OrderId,
context.Saga.Amount
});
}).TransitionTo(InventoryDeducted),
When(DeduceInventoryFailed).Then(context =>
{
context.Publish<ICancelOrderCommand>(new
{
context.Saga.OrderId
});
})
);
During(InventoryDeducted,
When(PayOrderFailed).Then(context =>
{
context.Publish<IReturnInventoryCommand>(new
{
context.Message.OrderId,
ReturnInventoryItems =
context.Saga.OrderItems.Select(x => new ReturnInventoryItem(x.SkuId, x.Qty)).ToList()
});
}),
When(PayOrderSucceed).TransitionTo(Paid).Then(context => context.SetCompleted()),
When(ReturnInventorySucceed)
.ThenAsync(context => context.Publish<ICancelOrderCommand>(new
{
context.Saga.OrderId
})).TransitionTo(Created));
DuringAny(When(OrderCanceled).TransitionTo(Canceled).ThenAsync(async context =>
{
await Task.Delay(TimeSpan.FromSeconds(10));
await context.SetCompleted();
}));
DuringAny(
When(OrderStateRequested)
.RespondAsync(x => x.Init<IOrderStateResponse>(new
{
x.Saga.OrderId,
State = x.Saga.CurrentState
}))
);
}
}
库存服务在整个下单流程的职责主要是库存的扣减和返还,其仅需要订阅 IDeduceInventoryCommand 和 IReturnInventoryCommand 两个命令并实现即可。代码如下所示:
using MassTransit.SmDemo.InventoryService.Repositories;
using MassTransit.SmDemo.Shared.Contracts;
namespace MassTransit.SmDemo.InventoryService.Consumers;
public class DeduceInventoryConsumer : IConsumer<IDeduceInventoryCommand>
{
private readonly ILogger<DeduceInventoryConsumer> _logger;
public DeduceInventoryConsumer(ILogger<DeduceInventoryConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<IDeduceInventoryCommand> context)
{
if (!CheckStock(context.Message.DeduceInventoryItems))
{
_logger.LogWarning($"Insufficient stock for order [{context.Message.OrderId}]!");
await context.Publish<IDeduceInventoryFailed>(
new { context.Message.OrderId, Reason = "insufficient stock" });
}
else
{
_logger.LogInformation($"Inventory has been deducted for order [{context.Message.OrderId}]!");
DeduceStocks(context.Message.DeduceInventoryItems);
await context.Publish<IDeduceInventorySucceed>(new { context.Message.OrderId });
}
}
private bool CheckStock(List<DeduceInventoryItem> deduceItems)
{
foreach (var stockItem in deduceItems)
{
if (InventoryRepository.GetStock(stockItem.SkuId) < stockItem.Qty) return false;
}
return true;
}
private void DeduceStocks(List<DeduceInventoryItem> deduceItems)
{
foreach (var stockItem in deduceItems)
{
InventoryRepository.TryDeduceStock(stockItem.SkuId, stockItem.Qty);
}
}
}
namespace MassTransit.SmDemo.InventoryService.Consumers;
public class ReturnInventoryConsumer : IConsumer<IReturnInventoryCommand>
{
private readonly ILogger<ReturnInventoryConsumer> _logger;
public ReturnInventoryConsumer(ILogger<ReturnInventoryConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<IReturnInventoryCommand> context)
{
foreach (var returnInventoryItem in context.Message.ReturnInventoryItems)
{
InventoryRepository.ReturnStock(returnInventoryItem.SkuId, returnInventoryItem.Qty);
}
_logger.LogInformation($"Inventory has been returned for order [{context.Message.OrderId}]!");
await context.Publish<IReturnInventorySucceed>(new { context.Message.OrderId });
}
}
对于下单流程的支付用例来说,要么成功要么失败,因此仅需要订阅 IPayOrderCommand 命令即可,具体 PayOrderConsumer 实现如下:
using MassTransit.SmDemo.Shared.Contracts;
namespace MassTransit.SmDemo.PaymentService.Consumers;
public class PayOrderConsumer : IConsumer<IPayOrderCommand>
{
private readonly ILogger<PayOrderConsumer> _logger;
public PayOrderConsumer(ILogger<PayOrderConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<IPayOrderCommand> context)
{
await Task.Delay(TimeSpan.FromSeconds(10));
if (context.Message.Amount % 2 == 0)
{_logger.LogInformation($"Order [{context.Message.OrderId}] paid successfully!");
await context.Publish<IPayOrderSucceed>(new { context.Message.OrderId });
}
else
{
_logger.LogWarning($"Order [{context.Message.OrderId}] payment failed!");
await context.Publish<IPayOrderFailed>(new
{
context.Message.OrderId,
Reason = "Insufficient account balance"
});
}
}
}
启动三个项目,并在Swagger中发起订单创建请求,如下图所示:
由于订单总额为奇数,因此支付会失败,最终控制台输出如下图所示:
打开RabbitMQ后台,可以看见MassTransit按照约定创建了以下队列用于服务间的消息传递:
其中 order-state 队列绑定到类型为 fanout 的同名 order-state Exchange,其绑定关系如下图所示,该Exchange负责从其他同名事件的Exchange转发事件.
通过以上示例的讲解,相信了解到MassTransit StateMachine的强大之处。StateMachine充当着事务编排器的角色,通过集中定义状态、转移条件和状态转移的执行顺序,实现高内聚的事务流转控制,也确保了其他伴生服务仅需关注自己的业务逻辑,而无需关心事务的流转,真正实现了关注点分离.
最后此篇关于MassTransit|基于StateMachine实现Saga编排式分布式事务的文章就讲到这里了,如果你想了解更多关于MassTransit|基于StateMachine实现Saga编排式分布式事务的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
有什么方法可以将来自 node_modules 的 sagas 与我为我的应用程序编写的其他 sagas 结合起来吗?如果 sagaMiddleware.run() 接受 sagas 数组,但 id
我熟悉redux-thunk,今天来redux-saga,我知道当我们要处理异步的时候,redux-saga需要先调用一个actions作为signal,然后sagas来处理,返回成功/失败操作,所以
如果我有两个 sagas 在同一 Action 上等待 yield take(),是否可以保证哪个 saga 将首先选择该 Action 并执行其剩余逻辑或者是随机的?我需要确保第一个传奇在第二个传奇
我一直在关注 AxonBank example为了理解 Axon 框架中 Saga 的实现,并有一些像这样的代码来开始和结束 saga @Saga public class MoneyTransfer
我有一个基本的传奇,如下所示: const mySaga = function* () { yield takeEvery("SOME_ACTION_REQUEST", function* (
关闭。这个问题是not reproducible or was caused by typos .它目前不接受答案。 这个问题是由于错别字或无法再重现的问题引起的。虽然类似的问题可能是on-topi
我正在开发一个使用 redux-saga 的项目管理来自 API 调用的状态更改。 我看过很多关于如何对传奇进行单元测试的例子,但很少有例子可以用来实际进行集成测试。我们正在使用 jest对于断言。
我有下一个代码: const sagaMiddleware = createSagaMiddleware(options); . . . const createdStore = createStor
当调度一个 Action 时,它到达 reducer 和传奇的顺序是否得到保证? 我可以信赖它 首先进入reducer 那么传奇呢? reducer : function reducer(state
我们的应用程序使用尝试 - 成功 - 失败的方法来处理来自服务器的响应。 我有一个生成器函数,需要像这样运行: function * getSingleSectorAttempt(action) {
当它在屏幕上可见时,我正在记录横幅展示次数。 当用户滚动时,同一条横幅可以在短时间内多次显示。 我想防止这种情况发生。 乍一看,throttle是防止它的完美方法。 但是,当您在一页中有多个横幅时,t
我在 containers/App/sagas.js 中有一个 sagas.js,它包含一个登录 saga,我试图从一个登录弹出窗口调用它,它也有自己的 sagas.js (components/Lo
我有一个 Saga,我需要在其中执行 3 个异步请求,然后在后续请求中使用这 3 个请求的响应。这里有一些伪代码来解释: function* useOtherActionsAndSagas(actio
所以,我是 react 样板的新手,似乎没有办法在不影响前一个传奇的功能的情况下包含另一个传奇(即它不起作用)。 我尝试将 sagas 声明为常量,然后将其传递到容器末尾的 compose 函数中,但
我有以下情况: export function* addCircle(circleApi, { payload }) { try { const response = yiel
以前也有人问过类似的问题,但答案对我没有任何帮助。 What are selectors in redux? How to get something from the state / store i
我有一个运行良好的传奇,我调用一个操作并包含一些数据,传奇被触发,它从操作中提取值,执行API调用并以几个yield put结束 - 太棒了。 我现在正在尝试测试该传奇并解决问题,我已经编写了一个简单
我使用 redux-saga 的主要原因之一是它进行异步函数调用的可测试性。我的困境是,当我使用不属于我的 redux 存储的有状态对象进行编程时,使用 sagas 进行编程变得非常尴尬。是否有使用非
我是传奇世界的新手。虽然我在 react-native 领域使用过 thunk,但此刻我很困惑。我正在努力让我的项目的骨架继续运行,我希望它很快就会变得非常大。考虑到这一点,我试图将逻辑分成多个文件。
我正在使用 saga eventChannel 来监听正在触发的事件(可能是实际应用程序中的 WebSocket),然后我正在更新我的 Redux Store。在组件中,我正在调用 API 操作。然后
我是一名优秀的程序员,十分优秀!