gpt4 book ai didi

c# - 带有消息队列的任务处理状态

转载 作者:行者123 更新时间:2023-11-30 15:04:49 25 4
gpt4 key购买 nike

我正在开发一个产品数据导入系统,该系统从外部来源下载产品数据,将其转换为适当的模式,并存储结果——本质上是一个 ETL 系统。系统处理的核心消息类型是“ImportProductCommand”,它指定要导入的产品和来源。然而,导入命令很少单独发送。一个典型的业务需求是从给定来源导入一整套产品。目前,这表示为“ImportProductsCommand”消息,可以指定要导入的多个产品。命令处理程序使用此消息,将其转换为单独的“ImportProductCommand”消息并将它们发送到队列进行处理。单个导入请求的消费者发布“ProductImportedEvent”或“ProductImportFailedEvent”。当收到“ImportProductsCommand”消息时,服务会为消息分配一个 GUID token ,将消息放入队列中,然后返回 token 。然后将 token 用作关联 ID,以便可以将单个导入请求与批量导入请求相关联。有了这个基础设施,就可以确定与给定 token 相关的事件数量,从而确定进口产品或进口失败的数量。缺少的是指示批量导入已完成的显式事件。单个导入请求的处理程序并未明确意识到它是批量导入请求的一部分。这当然可以通过了解要进口多少产品以及计算与特定关联 ID 关联的进口事件的数量来推断。当前的实现利用消息队列系统来处理进程重启和失败,但对批量导入请求不太明确。总的来说,系统需要回答的查询是:

  • 给定的批量导入完成了吗?
  • 对于给定的批量导入,还剩下多少单独的导入?
  • 完成了多少单独的导入?
  • 有多少是错误的?

有哪些最佳实践或建议方法可以支持这些查询并仍然利用消息队列系统实现弹性?目前,将它们联系在一起的是上面提到的 token ,但是没有明确的记录来表示批量导入请求实体,如果有,那么单个导入请求处理器将需要知道这样一个实体来更新相应的状态。

所有这些都是使用 C#、NServiceBus 实现的,并作为 IIS WCF 应用程序托管。

最佳答案

这可以实现为 NServiceBus Saga . ImportProductsCommand 应该由 Saga(ImportProductsSaga) 处理,Saga 数据在发送 ImportProductCommand 时可以包含要导入的产品的数量。 ImportProductsSaga 应该处理 ProductImportedEventProductImportFailedEvent。在 ImportProductsSaga 中处理的每个事件中,递增 ProductsImportedProdctsFailedToImport。还要检查 (ProductsImported + ProdctsFailedToImport) 的总和是否等于 ProdctsToBeImported,如果是,则完成 saga。

ImportProductsSaga 数据需要跟踪 ImportProductCommand 的发送次数和收到的回复,您可以计算待处理的回复等。Saga 数据如下所示:

   public class ImportProductsSataData{ 
public Guid Id {get; set}
public int ProdctsToBeImported {get; set}
public int ProdctsImported {get; set}
public int ProdctsFailedToImport {get; set}
}

关于c# - 带有消息队列的任务处理状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9727328/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com