- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
之前参与一个机票价格计算的项目,为他们设计了基本的处理流程,但是由于整个计算流程相当复杂,而且变化非常频繁,导致日常的修改、维护和升级也变得越来越麻烦,当我后来再接手的时候已经看不懂计算逻辑了。为了解决这个问题,我借鉴了“工作流”的思路,试图将整个计算过程设计成一个工作流。但是我又不想引入一个独立的工作流引擎,于是写了一个名为Pipelines的框架。顾名思义,Pipelines通过构建Pipeline的方式完成所需的处理流程,整个处理逻辑被分解并实现在若干Pipe中,这些Pipe按照指定的顺序将完成的Pipeline构建出来。Pipeline本质上就是一个简单的顺序工作流,它仅仅按序执行注册的Pipe。这个简单的Pipelines框架被放在这里,这里我不会介绍它的设计实现,只是简单地介绍它的用法,有兴趣的可以查看 源代码 .
1、构建并执行管道 2、Pipeline的“内部中断” 3、Pipeline的“外部中断” 4、处理层次化数据结构 5、利用扩展方法使Pipeline构建更简洁 。
Pipelines旨在提供一个用于处理数据的顺序工作流或者管道(以下简称Pipeline),该Pipeline在一个强类型的上下文中被执行,管道可以利用此上下文得到需要处理的数据,并将处理的结果(含中间结果)存储在上下文中。接下来我们来演示如何利用Pipelines框架处理人口统计数据的实例。如下所示的两个类型分别表示人口统计数据和处理上下文,后者继承基类ContextBase.
public class PopulationData { public object Statistics { get ; set ; } = default !; } public sealed class PopulationContext : ContextBase { public PopulationContext(PopulationData data)=> Data = data; public PopulationData Data { get ; } }
Pipeline由一系列Pipe对象按照注册的顺序组合而成。通过继承基类PipeBase<PopulationContext>,我们定义了三个Pipe类来完成针对人口统计数据的三项基本处理任务.
public sealed class FooPopulationPipe : PipeBase<PopulationContext> { public override string Description => " Global PopulationProcessor Foo "; protected override void Invoke(PopulationContext context) =>Console.WriteLine($" {nameof(FooPopulationPipe)} is invoked. "); } public sealed class BarPopulationPipe : PipeBase<PopulationContext> { public override string Description => " Global PopulationProcessor Bar "; protected override void Invoke(PopulationContext context) => Console.WriteLine($" {nameof(BarPopulationPipe)} is invoked. "); } public sealed class BazPopulationPipe : PipeBase<PopulationContext> { public override string Description => " Global PopulationProcessor Baz "; protected override void Invoke(PopulationContext context) => Console.WriteLine($" {nameof(BazPopulationPipe)} is invoked. "); }
我设计Pipelines的初衷是 让每个参与者(包含非技术人员)在代码的频繁迭代过程中,可以清晰地了解当前的处理流程 ,所以我会将当前应用构建的所有Pipeline的处理流程导出来。基于这个目的,每个Pipe类型都需要利用其Description属性提供一段描述当前处理逻辑的文本。Pipe具体的处理逻辑实现在重写的Invoke方法中。如果涉及异步处理,需要继承更上层的基类Pipe<TContext>(PipeBase<TContext>的基类)并重写异步的InvokeAsync方法.
Pipeline的构建实现在如下所示的BuildPipelines方法中,我们利用该方法提供的IPipelineProvider对象注册了一个命名为“PopulationProcessor”的Pipeline。具体来说,我们调用的是它的AddPipeline<TContext>方法,该方法提供的第一个参数为Pipeline的注册名称,另一个参数是一个类型为Action<IPipelineBuilder<TContext>>的委托,它利用提供的IPipelineBuilder<TContext>对象完成了上面定义的三个Pipe的注册.
using App; using Artech.Pipelines; var builder = WebApplication.CreateBuilder(args); builder.Services.AddPipelines(BuildPipelines); var app = builder.Build(); app.MapGet(" /test ", async (IPipelineProvider provider, HttpResponse response) => { Console.WriteLine(" Execute PopulationProcessor pipeline "); var context = new PopulationContext( new PopulationData()); var pipeline = provider.GetPipeline<PopulationContext>(" PopulationProcessor "); await pipeline.ProcessAsync(context); return Results.Ok(); }); app.Run(); static void BuildPipelines(IPipelineProvider pipelineProvider) { pipelineProvider.AddPipeline<PopulationContext>( name: " PopulationProcessor ", setup: builder => builder .Use<PopulationContext, FooPopulationPipe>() .Use<PopulationContext, BarPopulationPipe>() .Use<PopulationContext, BazPopulationPipe>()); }
Pipelines框架涉及的服务通过IServiceCollection接口的AddPipelines方法进行注册,BuildPipelines方法转换成委托作为该方法的参数。我们注册了一个指向“/test” 的路由终结点来演示针对管道的执行。如代码片段所示,我们利用注入的IPipelineProvider对象根据注册名称得到具体的Pipeline对象,并创建出相应的PopulationContext上下文作为参数来执行此Pipeline对象。程序执行后,请求路径”/pipelines”可以得到一个Pipeline的列表,点击具体的链接,对应Pipeline体现的流程就会呈现出来.
如果请求路径“/test”来执行构建的管道,管道执行的轨迹将会体现在控制台的输出结果上.
构成Pipeline的每个Pipe都可以根据处理逻辑的需要立即中断管道的执行。在如下这个重写的BarPopulationPipe类型的Invoke方法中,如果生成的随机数为偶数,它会调用上下文对象的Abort方法立即终止Pipeline的执行.
public sealed class BarPopulationPipe : PipeBase<PopulationContext> { private readonly Random _random = new (); public override string Description => " Global PopulationProcessor Bar "; protected override void Invoke(PopulationContext context) { Console.WriteLine($" {nameof(BarPopulationPipe)} is invoked. "); if (_random.Next() % 2 == 0) { context.Abort(); } } }
这样的化,当我们构建的Pipeline在执行过程中,有一半的几率BazPopulationPipe将不会执行,如下所示的输出结果体现了这一点.
对于继承自Pipe<TContext>的Pipe类型,其实现的InvokeAsync方法可以采用如下的方式中止当前Pipeline的执行,因为参数next返回的委托用于调用后续Pipe。如果不执行此委托,就意味着针对Pipeline的执行到此为止.
public sealed class BarPopulationPipe : Pipe<PopulationContext> { private readonly Random _random = new (); public override string Description => " Global PopulationProcessor Bar "; public override ValueTask InvokeAsync(PopulationContext context, Func<PopulationContext, ValueTask> next) { Console.WriteLine($" {nameof(BarPopulationPipe)} is invoked. "); if (_random.Next() % 2 != 0) { return next(context); } return ValueTask.CompletedTask; } }
在调用Pipeline时,我们可以利用执行上下文提供的CancellationToken中止Pipeline的执行。我们按照如下的方式再次改写了BarPopulationPipe的执行逻辑,如下面的代码片段所示,我们不再调用Abort方法,而是选择 延迟2秒 执行后续操作.
public sealed class BarPopulationPipe : Pipe<PopulationContext> { private readonly Random _random = new (); public override string Description => " Global PopulationProcessor Bar "; public override async ValueTask InvokeAsync(PopulationContext context, Func<PopulationContext, ValueTask> next) { Console.WriteLine($" {nameof(BarPopulationPipe)} is invoked. "); if (_random.Next() % 2 != 0) { await Task.Delay(2000); } await next(context); } }
我们按照如下的方式重写了PopulationContext的CancellationToken属性。我们为构造函数添加了两个参数,一个代表当前HttpContext上下文,另一个表示设置的超时时限。CancellationToken根据这两个参数创建而成,意味着管道不仅具有默认的超时时间,也可以通过HTTP调用方中止执行.
public sealed class PopulationContext: ContextBase { public PopulationContext(PopulationData data, HttpContext httpContext, TimeSpan timeout) { Data = data; CancellationToken = CancellationTokenSource.CreateLinkedTokenSource(httpContext.RequestAborted, new CancellationTokenSource(timeout).Token).Token ; } public PopulationData Data { get ; } public override CancellationToken CancellationToken { get ; } }
在注册的终结点处理器中,我们在执行Pipeline之前,将作为参数传入的PopulationContext上下文的超时时间设置为1秒.
var builder = WebApplication.CreateBuilder(args); builder.Services.AddPipelines(BuildPipelines); var app = builder.Build(); app.MapGet(" /test ", async (HttpContext httpContext,IPipelineProvider provider, HttpResponse response) => { Console.WriteLine(" Execute PopulationProcessor pipeline "); var context = new PopulationContext( new PopulationData(), httpContext, TimeSpan.FromSeconds(1)); var pipeline = provider.GetPipeline<PopulationContext>(" PopulationProcessor "); await pipeline.ProcessAsync(context); return Results.Ok(); }); app.Run();
根据BarPopulationPipe的执行逻辑,Pipeline的执行具有一半的几率会超时,一旦超时就会立即抛出一个OperationCancellationToken异常.
Pipelines设计的主要目的是用来处理层次化的数据结构,这涉及到 子Pipeline 的应用。目前我们处理的人口数据体现为一个简单的数据类型,现在我们让它变得更复杂一些。假设我们需要处理国家、省份和城市三个等级的人口数据,其中StatePopulationData代表整个国家的人口数据,它的Provinces属性承载了每个省份的数据。ProvincePopulationData代表具体某个省份的人口数据,其Cities属性承载了每个城市的人口数据.
public class PopulationData { public object Statistics { get ; set ; } = default !; } public class StatePopulationData { public IDictionary< string , ProvincePopulationData> Provinces { get ; set ; } = default !; } public class ProvincePopulationData { public IDictionary< string , PopulationData> Cities { get ; set ; } = default !; }
现在我们需要构建一个Pipeline来处理通过StatePopulationData类型表示的整个国家的人口数据,具体的处理流程为:
为此我们需要定义9个Pipe类型,以及3个执行上下文。如下所示的是三个执行上下文类型的具体定义:
public sealed class StatePopulationContext: ContextBase { public StatePopulationData PopulationData { get ; } public StatePopulationContext(StatePopulationData populationData) => PopulationData = populationData; } public sealed class ProvincePopulationContext : SubContextBase<StatePopulationContext, KeyValuePair< string , ProvincePopulationData>> { public string Province { get ; private set ; } = default !; public IDictionary< string , PopulationData> Cities { get ; private set ; } = default !; public override void Initialize(StatePopulationContext parent, KeyValuePair< string , ProvincePopulationData> item) { Province = item.Key; Cities = item.Value.Cities; base .Initialize(parent, item); } } public sealed class CityPopulationContext: SubContextBase<ProvincePopulationContext, KeyValuePair< string , PopulationData>> { public string City { get ; private set ; } = default !; public PopulationData PopulationData { get ; private set ; } = default !; public override void Initialize(ProvincePopulationContext parent, KeyValuePair< string , PopulationData> item) { City = item.Key; PopulationData = item.Value; base .Initialize(parent, item); } }
9个对应的Pipe类型定义如下。每个类型利用重写的Description提供一个简单的描述,重写的Invoke方法输出当前怎样的数据(那个省/市的人口数据).
public sealed class FooStatePipe : PipeBase<StatePopulationContext> { public override string Description => " State Population Processor Foo "; protected override void Invoke(StatePopulationContext context)=>Console.WriteLine(" Foo: Process state population "); } public sealed class BarStatePipe : PipeBase<StatePopulationContext> { public override string Description => " State Population Processor Bar "; protected override void Invoke(StatePopulationContext context) => Console.WriteLine(" Bar: Process state population "); } public sealed class BazStatePipe : PipeBase<StatePopulationContext> { public override string Description => " State Population Processor Baz "; protected override void Invoke(StatePopulationContext context) => Console.WriteLine(" Baz: Process state population "); } public sealed class FooProvincePipe : PipeBase<ProvincePopulationContext> { public override string Description => " Province Population Processor Foo "; protected override void Invoke(ProvincePopulationContext context) => Console.WriteLine($" \tFoo: Process population of the province {context.Province} "); } public sealed class BarProvincePipe : PipeBase<ProvincePopulationContext> { public override string Description => " Province Population Processor Bar "; protected override void Invoke(ProvincePopulationContext context) => Console.WriteLine($" \tBar: Process population of the province {context.Province} "); } public sealed class BazProvincePipe : PipeBase<ProvincePopulationContext> { public override string Description => " Province Population Processor Baz "; protected override void Invoke(ProvincePopulationContext context) => Console.WriteLine($" \tBaz: Process population of the province {context.Province} "); } public sealed class FooCityPipe : PipeBase<CityPopulationContext> { public override string Description => " City Population Processor Foo "; protected override void Invoke(CityPopulationContext context) => Console.WriteLine($" \t\tFoo: Process population of the city {context.City} "); } public sealed class BarCityPipe : PipeBase<CityPopulationContext> { public override string Description => " City Population Processor Bar "; protected override void Invoke(CityPopulationContext context) => Console.WriteLine($" \t\tBar: Process population of the city {context.City} "); } public sealed class BazCityPipe : PipeBase<CityPopulationContext> { public override string Description => " City Population Processor Baz "; protected override void Invoke(CityPopulationContext context) => Console.WriteLine($" \t\tBaz: Process population of the city {context.City} "); }
static void BuildPipelines(IPipelineProvider pipelineProvider) { pipelineProvider.AddPipeline<StatePopulationContext>(name: " PopulationProcessor ", setup: builder => builder .Use<StatePopulationContext, FooStatePipe>() .Use<StatePopulationContext, BarStatePipe>() .ForEach<StatePopulationContext, ProvincePopulationContext, KeyValuePair< string , ProvincePopulationData>>( description: " For each province ", collectionAccessor: context => context.PopulationData.Provinces, subPipelineSetup: provinceBuilder => provinceBuilder .Use<ProvincePopulationContext, FooProvincePipe>() .Use<ProvincePopulationContext, BarProvincePipe>() .ForEach<ProvincePopulationContext, CityPopulationContext, KeyValuePair< string , PopulationData>>( description: " For each city ", collectionAccessor: context => context.Cities, subPipelineSetup: cityBuilder => cityBuilder .Use<CityPopulationContext, FooCityPipe>() .Use<CityPopulationContext, BarCityPipe>() .Use<CityPopulationContext, BazCityPipe>()) .Use<ProvincePopulationContext, BazProvincePipe>()) .Use<StatePopulationContext, BazStatePipe>()); }
终结点处理程序在执行新的Pipeline时,会按照如下的形式将StatePopulationContext上下文构建出来。处理人口数据涉及三个省份(江苏、山东和浙江),每个省份包含三个城市的人口数据.
var builder = WebApplication.CreateBuilder(args); builder.Services.AddPipelines(BuildPipelines); var app = builder.Build(); app.MapGet(" /test ", async (HttpContext httpContext, IPipelineProvider provider, HttpResponse response) => { Console.WriteLine(" Execute PopulationProcessor pipeline "); var data = new StatePopulationData { Provinces = new Dictionary< string , ProvincePopulationData>() }; data.Provinces.Add(" Jiangsu ", new ProvincePopulationData { Cities = new Dictionary< string , PopulationData> { {" Suzhou ", new PopulationData() }, {" Wuxi ", new PopulationData() }, {" Changezhou ", new PopulationData() }, } }); data.Provinces.Add(" Shandong ", new ProvincePopulationData { Cities = new Dictionary< string , PopulationData> { {" Qingdao ", new PopulationData() }, {" Jinan ", new PopulationData() }, {" Yantai ", new PopulationData() }, } }); data.Provinces.Add(" Zhejiang ", new ProvincePopulationData { Cities = new Dictionary< string , PopulationData> { {" Hangzhou ", new PopulationData() }, {" Ningbo ", new PopulationData() }, {" Wenzhou ", new PopulationData() }, } }); var context = new StatePopulationContext(data); var pipeline = provider.GetPipeline<StatePopulationContext>(" PopulationProcessor "); await pipeline.ProcessAsync(context); return Results.Ok(); }); app.Run();
应用启动后,我们依然可以从Pipeline导出页面看到整个Pipeline的处理流程.
当我们请求“/test”,Pipeline针对国家人口数据的执行流程体现在控制台输出上.
Pipeline的构建过程体现了完整的处理流程,所以我们应该构建代码尽可能地简洁,最理想的状态就是让非技术人员也能看懂。Pipelines提供的用于注册Pipe的API均为泛型方法,并且会涉及两到三个必须显式指定的泛型参数,使用起来还不是很方便。不过这个问题可以通过自定义扩展方法来解决.
public static class Extensions { public static IPipelineBuilder<StatePopulationContext> UseStatePipe<TPipe>( this IPipelineBuilder<StatePopulationContext> builder) where TPipe : Pipe<StatePopulationContext> => builder.Use<StatePopulationContext, TPipe>(); public static IPipelineBuilder<ProvincePopulationContext> UseProvincePipe<TPipe>( this IPipelineBuilder<ProvincePopulationContext> builder) where TPipe : Pipe<ProvincePopulationContext> => builder.Use<ProvincePopulationContext, TPipe>(); public static IPipelineBuilder<CityPopulationContext> UseCityPipe<TPipe>( this IPipelineBuilder<CityPopulationContext> builder) where TPipe : Pipe<CityPopulationContext> => builder.Use<CityPopulationContext, TPipe>(); public static IPipelineBuilder<StatePopulationContext> ForEachProvince( this IPipelineBuilder<StatePopulationContext> builder, Action<IPipelineBuilder<ProvincePopulationContext>> setup) => builder.ForEach(" For each province ", it => it.PopulationData.Provinces, (_, _) => true , setup); public static IPipelineBuilder<ProvincePopulationContext> ForEachCity( this IPipelineBuilder<ProvincePopulationContext> builder, Action<IPipelineBuilder<CityPopulationContext>> setup) => builder.ForEach(" For each city ", it => it.Cities, (_, _) => true , setup); }
如上面的代码片段所示,我们针对三个数据层次(国家、省份、城市)定义了注册对应Pipe的扩展方法UseStatePipe、UseProvincePipe和UseCityPipe。还分别定义了ForEachProvince和ForEachCity这两个扩展方法来注册构建处理省份/城市人口数据的子Pipeline。有了这5个扩展方法,构建整个Pipeline的代码就可以变得非常简单而清晰,即使不写任何的注释,相信每个人(包括非开发人员)都能读懂涉及的处理流程.
static void BuildPipelines(IPipelineProvider pipelineProvider) { pipelineProvider.AddPipeline<StatePopulationContext>(name: " PopulationProcessor ", setup: builder => builder .UseStatePipe<FooStatePipe>() .UseStatePipe<BarStatePipe>() .ForEachProvince(provinceBuilder => provinceBuilder .UseProvincePipe<FooProvincePipe>() .UseProvincePipe<BarProvincePipe>() .ForEachCity(cityBuilder => cityBuilder .UseCityPipe<FooCityPipe>() .UseCityPipe<BarCityPipe>() .UseCityPipe<BazCityPipe>()) .UseProvincePipe<BazProvincePipe>()) .UseStatePipe<BazStatePipe>()); }
最后此篇关于以管道的方式来完成复杂的流程处理的文章就讲到这里了,如果你想了解更多关于以管道的方式来完成复杂的流程处理的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
对于 Metal ,如果对主纹理进行 mipmap 处理,是否还需要对多采样纹理进行 mipmap 处理?我阅读了苹果文档,但没有得到任何相关信息。 最佳答案 Mipmapping 适用于您将从中
我正在使用的代码在后端 Groovy 代码中具有呈现 GSP(Groovy 服务器页面)的 Controller 。对于前端,我们使用 React-router v4 来处理路由。我遇到的问题是,通过
我们正在 build 一个巨大的网站。我们正在考虑是在服务器端(ASP .Net)还是在客户端进行 HTML 处理。 例如,我们有 HTML 文件,其作用类似于用于生成选项卡的模板。服务器端获取 HT
我正在尝试将图像加载到 void setup() 中的数组中,但是当我这样做时出现此错误:“类型不匹配,'processing .core.PImage' does not匹配“processing.
我正在尝试使用其私有(private)应用程序更新 Shopify 上的客户标签。我用 postman 尝试过,一切正常,但通过 AJAX,它带我成功回调而不是错误,但成功后我得到了身份验证链接,而不
如何更改我的 Processing appIconTest.exe 导出的默认图标在窗口中的应用程序? 默认一个: 最佳答案 经过一些研究,我能找到的最简单的解决方案是: 进入 ...\process
我在 Processing 中做了一个简单的小游戏,但需要一些帮助。我有一个 mp3,想将它添加到我的应用程序中,以便在后台循环运行。 这可能吗?非常感谢。 最佳答案 您可以使用声音库。处理已经自带
我有几个这样创建的按钮: 在 setup() PImage[] imgs1 = {loadImage("AREA1_1.png"),loadImage("AREA1_2.png"),loadImage
我正在尝试使用 Processing 创建一个多人游戏,但无法弄清楚如何将屏幕分成两个以显示玩家的不同情况? 就像在 c# 中一样,我们有Viewport leftViewport,rightView
我一直在尝试使用 Moore 邻域在处理过程中创建元胞自动机,到目前为止非常成功。我已经设法使基本系统正常工作,现在我希望通过添加不同的功能来使用它。现在,我检查细胞是否存活。如果是,我使用 fill
有没有办法用 JavaScript 代码检查资源使用情况?我可以检查脚本的 RAM 使用情况和 CPU 使用情况吗? 由于做某事有多种方法,我可能会使用不同的方法编写代码,并将其保存为两个不同的文件,
我想弄清楚如何处理这样的列表: [ [[4,6,7], [1,2,4,6]] , [[10,4,2,4], [1]] ] 这是一个整数列表的列表 我希望我的函数将此列表作为输入并返回列表中没有重复的整
有没有办法在不需要时处理 MethodChannel/EventChannel ?我问是因为我想为对象创建多个方法/事件 channel 。 例子: class Call { ... fields
我有一个关于在 Python3 中处理 ConnectionResetError 的问题。这通常发生在我使用 urllib.request.Request 函数时。我想知道如果我们遇到这样的错误是否可
我一直在努力解决这个问题几个小时,但无济于事。代码很简单,一个弹跳球(粒子)。将粒子的速度初始化为 (0, 0) 将使其保持上下弹跳。将粒子的初始化速度更改为 (0, 0.01) 或任何十进制浮点数都
我把自己弄得一团糟。 我想在我的系统中添加 python3.6 所以我决定在我的 Ubuntu 19.10 中卸载现有的。但是现在每次我想安装一些东西我都会得到这样的错误: dpkg: error w
我正在努力解决 Rpart 包中的 NA 功能。我得到了以下数据框(下面的代码) Outcome VarA VarB 1 1 1 0 2 1 1 1
我将 Java 与 JSF 一起使用,这是 Glassfish 3 容器。 在我的 Web 应用程序中,我试图实现一个文件(图像)管理系统。 我有一个 config.properties我从中读取上传
所以我一直在Processing工作几个星期以来,虽然我没有编程经验,但我已经转向更复杂的项目。我正在编写一个进化模拟器,它会产生具有随机属性的生物。 最终,我将添加复制,但现在这些生物只是在屏幕上漂
有人知道 Delphi 2009 对“with”的处理有什么不同吗? 我昨天解决了一个问题,只是将“with”解构为完整引用,如“with Datamodule、Dataset、MainForm”。
我是一名优秀的程序员,十分优秀!