gpt4 book ai didi

c# - 如何在 MassTransit 和 Automatonymous 中配置 EF Core 持久性?

转载 作者:行者123 更新时间:2023-12-04 08:03:54 24 4
gpt4 key购买 nike

我正在尝试使用 EF Core 作为持久性配置 Automatonymous worker 实现。我通过 api 发布事件并使用 RabbitMq 作为传输在托管服务中处理它。不幸的是,数据库不存储机器状态。我应用了迁移,我看到了表 OrderState ,但是我发布后里面没有数据OrderSubmmited事件。它得到了正确处理,因为我在控制台中看到了一个日志,但数据库表仍然是空的。我错过了什么?
这是我的代码:
事件:

public interface OrderSubmitted : CorrelatedBy<Guid>
{
}
消费者:
public class OrderSubmittedConsumer : IConsumer<OrderSubmitted>
{
public Task Consume(ConsumeContext<OrderSubmitted> context)
{
Console.Out.WriteLine($"Order with id {context.Message.CorrelationId} has been submitted.");

return Task.CompletedTask;
}
}
我的传奇实例:
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
}
状态机:
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }

public Event<OrderSubmitted> OrderSubmitted { get; set; }

public OrderStateMachine()
{
Event(() => OrderSubmitted);

InstanceState(x => x.CurrentState);

Initially(
When(OrderSubmitted)
.TransitionTo(Submitted));

DuringAny(
When(OrderSubmitted)
.TransitionTo(Submitted));
}
}
public class OrderStateMachineDefinition : SagaDefinition<OrderState>
{
public OrderStateMachineDefinition()
{
ConcurrentMessageLimit = 15;
}
}
数据库上下文:
public class OrderStateDbContext : SagaDbContext
{
public OrderStateDbContext(DbContextOptions options) : base(options)
{
}

protected override IEnumerable<ISagaClassMap> Configurations
{
get
{
yield return new OrderStateMap();
}
}
}
public class OrderStateMap : SagaClassMap<OrderState>
{
protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model)
{
entity.Property(x => x.CurrentState).HasMaxLength(64);
}
}
程序类:
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddDbContext<OrderStateDbContext>(builder =>
builder.UseSqlServer("Server=(localdb)\\mssqllocaldb;Database=OrderState;Trusted_Connection=True;", m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
}));

services.AddMassTransit(config =>
{
config.AddSagaRepository<OrderState>()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext<OrderStateDbContext>();
r.LockStatementProvider = new SqlServerLockStatementProvider();
});

config.AddConsumer<OrderSubmittedConsumer>();

config.UsingRabbitMq((ctx, cfg) => {
cfg.Host("amqp://guest:guest@localhost:5672");
cfg.ReceiveEndpoint("order-queue", c => {
c.ConfigureConsumer<OrderSubmittedConsumer>(ctx);
// I'm assuming this is the place where something like c.StateMachineSaga() is missing, but I don't know how should this look like with EF Core
});
});
});

services.AddHostedService<Worker>();
});
}
worker 类(Class):
public class Worker : IHostedService
{
private readonly IBusControl _bus;

public Worker(IBusControl bus)
{
_bus = bus;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
await _bus.StartAsync(cancellationToken).ConfigureAwait(false);
}

public Task StopAsync(CancellationToken cancellationToken)
{
return _bus.StopAsync(cancellationToken);
}
}

最佳答案

在您的代码示例中,您没有添加 saga 或在接收端点上配置 saga。消费者是一个独立的东西,与传奇完全无关。你应该调用:AddSagaStateMachine<OrderStateMachine, OrderState, OrderStateMachineDefinition>()然后使用 ConfigureSaga<OrderState>或切换到 ConfigureEndpoints用于自动配置端点。

关于c# - 如何在 MassTransit 和 Automatonymous 中配置 EF Core 持久性?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66318070/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com