gpt4 book ai didi

以管道的方式来完成复杂的流程处理

转载 作者:我是一只小鸟 更新时间:2023-06-30 14:34:19 24 4
gpt4 key购买 nike

之前参与一个机票价格计算的项目,为他们设计了基本的处理流程,但是由于整个计算流程相当复杂,而且变化非常频繁,导致日常的修改、维护和升级也变得越来越麻烦,当我后来再接手的时候已经看不懂计算逻辑了。为了解决这个问题,我借鉴了“工作流”的思路,试图将整个计算过程设计成一个工作流。但是我又不想引入一个独立的工作流引擎,于是写了一个名为Pipelines的框架。顾名思义,Pipelines通过构建Pipeline的方式完成所需的处理流程,整个处理逻辑被分解并实现在若干Pipe中,这些Pipe按照指定的顺序将完成的Pipeline构建出来。Pipeline本质上就是一个简单的顺序工作流,它仅仅按序执行注册的Pipe。这个简单的Pipelines框架被放在这里,这里我不会介绍它的设计实现,只是简单地介绍它的用法,有兴趣的可以查看 源代码 .

1、构建并执行管道 2、Pipeline的“内部中断” 3、Pipeline的“外部中断” 4、处理层次化数据结构 5、利用扩展方法使Pipeline构建更简洁 。

1、构建并执行管道

Pipelines旨在提供一个用于处理数据的顺序工作流或者管道(以下简称Pipeline),该Pipeline在一个强类型的上下文中被执行,管道可以利用此上下文得到需要处理的数据,并将处理的结果(含中间结果)存储在上下文中。接下来我们来演示如何利用Pipelines框架处理人口统计数据的实例。如下所示的两个类型分别表示人口统计数据和处理上下文,后者继承基类ContextBase.

                          
                            public
                          
                          
                            class
                          
                           PopulationData
{
    
                          
                            public
                          
                          
                            object
                          
                           Statistics { 
                          
                            get
                          
                          ; 
                          
                            set
                          
                          ; } = 
                          
                            default
                          
                          !;
}

                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           PopulationContext : ContextBase
{
    
                          
                            public
                          
                           PopulationContext(PopulationData data)=> Data = data;
    
                          
                            public
                          
                           PopulationData Data { 
                          
                            get
                          
                          ; }
}

                        

Pipeline由一系列Pipe对象按照注册的顺序组合而成。通过继承基类PipeBase<PopulationContext>,我们定义了三个Pipe类来完成针对人口统计数据的三项基本处理任务.

                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           FooPopulationPipe : PipeBase<PopulationContext>
{
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            Global PopulationProcessor Foo
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(PopulationContext context) =>Console.WriteLine($"
                          
                            {nameof(FooPopulationPipe)} is invoked.
                          
                          ");
}

                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           BarPopulationPipe : PipeBase<PopulationContext>
{
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            Global PopulationProcessor Bar
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(PopulationContext context) => Console.WriteLine($"
                          
                            {nameof(BarPopulationPipe)} is invoked.
                          
                          ");
}

                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           BazPopulationPipe : PipeBase<PopulationContext>
{
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            Global PopulationProcessor Baz
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(PopulationContext context) => Console.WriteLine($"
                          
                            {nameof(BazPopulationPipe)} is invoked.
                          
                          ");
}

                        

我设计Pipelines的初衷是 让每个参与者(包含非技术人员)在代码的频繁迭代过程中,可以清晰地了解当前的处理流程 ,所以我会将当前应用构建的所有Pipeline的处理流程导出来。基于这个目的,每个Pipe类型都需要利用其Description属性提供一段描述当前处理逻辑的文本。Pipe具体的处理逻辑实现在重写的Invoke方法中。如果涉及异步处理,需要继承更上层的基类Pipe<TContext>(PipeBase<TContext>的基类)并重写异步的InvokeAsync方法.

Pipeline的构建实现在如下所示的BuildPipelines方法中,我们利用该方法提供的IPipelineProvider对象注册了一个命名为“PopulationProcessor”的Pipeline。具体来说,我们调用的是它的AddPipeline<TContext>方法,该方法提供的第一个参数为Pipeline的注册名称,另一个参数是一个类型为Action<IPipelineBuilder<TContext>>的委托,它利用提供的IPipelineBuilder<TContext>对象完成了上面定义的三个Pipe的注册.

                          
                            using
                          
                           App;

                          
                            using
                          
                           Artech.Pipelines;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddPipelines(BuildPipelines);
var app = builder.Build();
app.MapGet("
                          
                            /test
                          
                          ", async (IPipelineProvider provider, HttpResponse response) => {
    Console.WriteLine("
                          
                            Execute PopulationProcessor pipeline
                          
                          ");
    var context = 
                          
                            new
                          
                           PopulationContext(
                          
                            new
                          
                           PopulationData());
    var pipeline = provider.GetPipeline<PopulationContext>("
                          
                            PopulationProcessor
                          
                          ");
    await pipeline.ProcessAsync(context);
    
                          
                            return
                          
                           Results.Ok();
});
app.Run();


                          
                            static
                          
                          
                            void
                          
                           BuildPipelines(IPipelineProvider pipelineProvider)
{
    pipelineProvider.AddPipeline<PopulationContext>(
        name: "
                          
                            PopulationProcessor
                          
                          ",
        setup: builder => builder
            .Use<PopulationContext, FooPopulationPipe>()
            .Use<PopulationContext, BarPopulationPipe>()
            .Use<PopulationContext, BazPopulationPipe>());
}

                        

Pipelines框架涉及的服务通过IServiceCollection接口的AddPipelines方法进行注册,BuildPipelines方法转换成委托作为该方法的参数。我们注册了一个指向“/test” 的路由终结点来演示针对管道的执行。如代码片段所示,我们利用注入的IPipelineProvider对象根据注册名称得到具体的Pipeline对象,并创建出相应的PopulationContext上下文作为参数来执行此Pipeline对象。程序执行后,请求路径”/pipelines”可以得到一个Pipeline的列表,点击具体的链接,对应Pipeline体现的流程就会呈现出来.

clip_image002

如果请求路径“/test”来执行构建的管道,管道执行的轨迹将会体现在控制台的输出结果上.

clip_image003

2、Pipeline的“内部中断”

构成Pipeline的每个Pipe都可以根据处理逻辑的需要立即中断管道的执行。在如下这个重写的BarPopulationPipe类型的Invoke方法中,如果生成的随机数为偶数,它会调用上下文对象的Abort方法立即终止Pipeline的执行.

                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           BarPopulationPipe : PipeBase<PopulationContext>
{
    
                          
                            private
                          
                          
                            readonly
                          
                           Random _random = 
                          
                            new
                          
                          ();
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            Global PopulationProcessor Bar
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(PopulationContext context)
    {
        Console.WriteLine($"
                          
                            {nameof(BarPopulationPipe)} is invoked.
                          
                          ");
        
                          
                            if
                          
                           (_random.Next() % 2 == 0)
        {
            
                          
                            context.Abort();
                          
                          
        }
    }
}

                        

这样的化,当我们构建的Pipeline在执行过程中,有一半的几率BazPopulationPipe将不会执行,如下所示的输出结果体现了这一点.

clip_image004

对于继承自Pipe<TContext>的Pipe类型,其实现的InvokeAsync方法可以采用如下的方式中止当前Pipeline的执行,因为参数next返回的委托用于调用后续Pipe。如果不执行此委托,就意味着针对Pipeline的执行到此为止.

                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           BarPopulationPipe : Pipe<PopulationContext>
{
    
                          
                            private
                          
                          
                            readonly
                          
                           Random _random = 
                          
                            new
                          
                          ();
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            Global PopulationProcessor Bar
                          
                          ";
    
                          
                            public
                          
                          
                            override
                          
                           ValueTask InvokeAsync(PopulationContext context, Func<PopulationContext, ValueTask> next)
    {
        Console.WriteLine($"
                          
                            {nameof(BarPopulationPipe)} is invoked.
                          
                          ");
        
                          
                            if
                          
                           (_random.Next() % 2 != 0)
        {
           
                          
                             return next(context);
                          
                          
        }
        
                          
                            return
                          
                           ValueTask.CompletedTask;
    }
}

                        

3、Pipeline的“外部中断”

在调用Pipeline时,我们可以利用执行上下文提供的CancellationToken中止Pipeline的执行。我们按照如下的方式再次改写了BarPopulationPipe的执行逻辑,如下面的代码片段所示,我们不再调用Abort方法,而是选择 延迟2秒 执行后续操作.

                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           BarPopulationPipe : Pipe<PopulationContext>
{
    
                          
                            private
                          
                          
                            readonly
                          
                           Random _random = 
                          
                            new
                          
                          ();
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            Global PopulationProcessor Bar
                          
                          ";
    
                          
                            public
                          
                          
                            override
                          
                           async ValueTask InvokeAsync(PopulationContext context, Func<PopulationContext, ValueTask> next)
    {
        Console.WriteLine($"
                          
                            {nameof(BarPopulationPipe)} is invoked.
                          
                          ");
        
                          
                            if
                          
                           (_random.Next() % 2 != 0)
        {
            
                          
                            await Task.Delay(2000);
                          
                          
        }
        await next(context);
    }
}

                        

我们按照如下的方式重写了PopulationContext的CancellationToken属性。我们为构造函数添加了两个参数,一个代表当前HttpContext上下文,另一个表示设置的超时时限。CancellationToken根据这两个参数创建而成,意味着管道不仅具有默认的超时时间,也可以通过HTTP调用方中止执行.

                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           PopulationContext: ContextBase
{
    
                          
                            public
                          
                           PopulationContext(PopulationData data, HttpContext httpContext, TimeSpan timeout)
    {
        Data = data;
        
                          
                            CancellationToken = CancellationTokenSource.CreateLinkedTokenSource(httpContext.RequestAborted, new CancellationTokenSource(timeout).Token).Token
                          
                          ;
    }
    
                          
                            public
                          
                           PopulationData Data { 
                          
                            get
                          
                          ; }
    
                          
                            public
                          
                          
                            override
                          
                           CancellationToken CancellationToken { 
                          
                            get
                          
                          ; }
}

                        

在注册的终结点处理器中,我们在执行Pipeline之前,将作为参数传入的PopulationContext上下文的超时时间设置为1秒.

                          var builder = WebApplication.CreateBuilder(args);
builder.Services.AddPipelines(BuildPipelines);
var app = builder.Build();
app.MapGet("
                          
                            /test
                          
                          ", async (HttpContext httpContext,IPipelineProvider provider, HttpResponse response) => {
    Console.WriteLine("
                          
                            Execute PopulationProcessor pipeline
                          
                          ");
    var context = 
                          
                            new
                          
                           PopulationContext(
                          
                            new
                          
                           PopulationData(), httpContext, TimeSpan.FromSeconds(1));

    var pipeline = provider.GetPipeline<PopulationContext>("
                          
                            PopulationProcessor
                          
                          ");
    await pipeline.ProcessAsync(context);
    
                          
                            return
                          
                           Results.Ok();
});
app.Run();

                        

根据BarPopulationPipe的执行逻辑,Pipeline的执行具有一半的几率会超时,一旦超时就会立即抛出一个OperationCancellationToken异常.

clip_image005

4、处理层次化数据结构

Pipelines设计的主要目的是用来处理层次化的数据结构,这涉及到 子Pipeline 的应用。目前我们处理的人口数据体现为一个简单的数据类型,现在我们让它变得更复杂一些。假设我们需要处理国家、省份和城市三个等级的人口数据,其中StatePopulationData代表整个国家的人口数据,它的Provinces属性承载了每个省份的数据。ProvincePopulationData代表具体某个省份的人口数据,其Cities属性承载了每个城市的人口数据.

                          
                            public
                          
                          
                            class
                          
                           PopulationData
{
    
                          
                            public
                          
                          
                            object
                          
                           Statistics { 
                          
                            get
                          
                          ; 
                          
                            set
                          
                          ; } = 
                          
                            default
                          
                          !;
}


                          
                            public
                          
                          
                            class
                          
                           StatePopulationData
{
    
                          
                            public
                          
                           IDictionary<
                          
                            string
                          
                          , ProvincePopulationData> Provinces { 
                          
                            get
                          
                          ; 
                          
                            set
                          
                          ; } = 
                          
                            default
                          
                          !;
}


                          
                            public
                          
                          
                            class
                          
                           ProvincePopulationData
{
    
                          
                            public
                          
                           IDictionary<
                          
                            string
                          
                          , PopulationData> Cities { 
                          
                            get
                          
                          ; 
                          
                            set
                          
                          ; } = 
                          
                            default
                          
                          !;
}

                        

现在我们需要构建一个Pipeline来处理通过StatePopulationData类型表示的整个国家的人口数据,具体的处理流程为:

  • 利用FooStatePipe处理国家人口数据
  • 利用BarStatePipe处理国家人口数据
  • 构建子Pipeline处理每个省份人口数据,子Pipeline处理逻辑:
    • 利用FooProvincePipe处理省份人口数据
    • 利用BarProvincePipe处理省份人口数据、
    • 构建子Pipeline处理每个城市人口数据,子Pipeline处理逻辑
      • 利用FooCityPipe处理城市人口数据
      • 利用BarCityPipe处理城市人口数据
      • 利用BazCityPipe处理城市人口数据
    • 利用BazProvincePipe处理省份人口数据
  • 利用BazStatePipe处理国家人口数据

为此我们需要定义9个Pipe类型,以及3个执行上下文。如下所示的是三个执行上下文类型的具体定义:

                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           StatePopulationContext: ContextBase
{
    
                          
                            public
                          
                           StatePopulationData PopulationData { 
                          
                            get
                          
                          ; }
    
                          
                            public
                          
                           StatePopulationContext(StatePopulationData populationData) => PopulationData = populationData;
}


                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           ProvincePopulationContext : SubContextBase<StatePopulationContext, KeyValuePair<
                          
                            string
                          
                          , ProvincePopulationData>>
{
    
                          
                            public
                          
                          
                            string
                          
                           Province { 
                          
                            get
                          
                          ; 
                          
                            private
                          
                          
                            set
                          
                          ; } = 
                          
                            default
                          
                          !;
    
                          
                            public
                          
                           IDictionary<
                          
                            string
                          
                          , PopulationData> Cities { 
                          
                            get
                          
                          ; 
                          
                            private
                          
                          
                            set
                          
                          ; } = 
                          
                            default
                          
                          !;
    
                          
                            public
                          
                          
                            override
                          
                          
                            void
                          
                           Initialize(StatePopulationContext parent, KeyValuePair<
                          
                            string
                          
                          , ProvincePopulationData> item)
    {
        Province = item.Key;
        Cities = item.Value.Cities;
        
                          
                            base
                          
                          .Initialize(parent, item);
    }
}


                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           CityPopulationContext: SubContextBase<ProvincePopulationContext, KeyValuePair<
                          
                            string
                          
                          , PopulationData>>
{
    
                          
                            public
                          
                          
                            string
                          
                           City { 
                          
                            get
                          
                          ; 
                          
                            private
                          
                          
                            set
                          
                          ; } = 
                          
                            default
                          
                          !;
    
                          
                            public
                          
                           PopulationData PopulationData { 
                          
                            get
                          
                          ; 
                          
                            private
                          
                          
                            set
                          
                          ; } = 
                          
                            default
                          
                          !;
    
                          
                            public
                          
                          
                            override
                          
                          
                            void
                          
                           Initialize(ProvincePopulationContext parent, KeyValuePair<
                          
                            string
                          
                          , PopulationData> item)
    {
        City = item.Key;
        PopulationData = item.Value;
        
                          
                            base
                          
                          .Initialize(parent, item);
}
}

                        

9个对应的Pipe类型定义如下。每个类型利用重写的Description提供一个简单的描述,重写的Invoke方法输出当前怎样的数据(那个省/市的人口数据).

                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           FooStatePipe : PipeBase<StatePopulationContext>
{
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            State Population Processor Foo
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(StatePopulationContext context)=>Console.WriteLine("
                          
                            Foo: Process state population
                          
                          ");
}

                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           BarStatePipe : PipeBase<StatePopulationContext>
{
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            State Population Processor Bar
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(StatePopulationContext context) => Console.WriteLine("
                          
                            Bar: Process state population
                          
                          ");
}

                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           BazStatePipe : PipeBase<StatePopulationContext>
{
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            State Population Processor Baz
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(StatePopulationContext context) => Console.WriteLine("
                          
                            Baz: Process state population
                          
                          ");
}


                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           FooProvincePipe : PipeBase<ProvincePopulationContext>
{
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            Province Population Processor Foo
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(ProvincePopulationContext context) => Console.WriteLine($"
                          
                            \tFoo: Process population of the province {context.Province}
                          
                          ");
}


                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           BarProvincePipe : PipeBase<ProvincePopulationContext>
{
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            Province Population Processor Bar
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(ProvincePopulationContext context) => Console.WriteLine($"
                          
                            \tBar: Process population of the province {context.Province}
                          
                          ");

}


                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           BazProvincePipe : PipeBase<ProvincePopulationContext>
{
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            Province Population Processor Baz
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(ProvincePopulationContext context) => Console.WriteLine($"
                          
                            \tBaz: Process population of the province {context.Province}
                          
                          ");
}


                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           FooCityPipe : PipeBase<CityPopulationContext>
{
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            City Population Processor Foo
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(CityPopulationContext context) => Console.WriteLine($"
                          
                            \t\tFoo: Process population of the city {context.City}
                          
                          ");
}


                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           BarCityPipe : PipeBase<CityPopulationContext>
{
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            City Population Processor Bar
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(CityPopulationContext context) => Console.WriteLine($"
                          
                            \t\tBar: Process population of the city {context.City}
                          
                          ");

}


                          
                            public
                          
                          
                            sealed
                          
                          
                            class
                          
                           BazCityPipe : PipeBase<CityPopulationContext>
{
    
                          
                            public
                          
                          
                            override
                          
                          
                            string
                          
                           Description => "
                          
                            City Population Processor Baz
                          
                          ";
    
                          
                            protected
                          
                          
                            override
                          
                          
                            void
                          
                           Invoke(CityPopulationContext context) => Console.WriteLine($"
                          
                            \t\tBaz: Process population of the city {context.City}
                          
                          ");
}

                        
用于构建这个Pipeline的BuildPipelines方法根据构建的Pipeline结构进行了如下的改写:子Pipeline通过IPipelineBuilder<TContext>接口的ForEach<TContext, TSubContext, TItem>扩展方法构建,三个泛型参数类型分别表示当前执行上下文类型、子上下文类型和子Pipeline处理数据。它具有三个参数,description提供到处文本,collectionAccessor利用一个委托获取一个集合对象(构建的子Pipeline用于处理它的每一个元素),subPipelineSetup提供的委托完整最终子Pipeline的构建。虽然看起来复杂,但是其结构还是很清晰的,即使是非技术人员也能明白这个Pipeline体现的处理流程。
                          
                            static
                          
                          
                            void
                          
                           BuildPipelines(IPipelineProvider pipelineProvider)
{
    pipelineProvider.AddPipeline<StatePopulationContext>(name: "
                          
                            PopulationProcessor
                          
                          ", setup: builder => builder
      .Use<StatePopulationContext, FooStatePipe>()
      .Use<StatePopulationContext, BarStatePipe>()
      .ForEach<StatePopulationContext, ProvincePopulationContext, KeyValuePair<
                          
                            string
                          
                          , ProvincePopulationData>>(
            description: "
                          
                            For each province
                          
                          ",
            collectionAccessor: context => context.PopulationData.Provinces,
            subPipelineSetup: provinceBuilder => provinceBuilder
                .Use<ProvincePopulationContext, FooProvincePipe>()
                .Use<ProvincePopulationContext, BarProvincePipe>()
                .ForEach<ProvincePopulationContext, CityPopulationContext, KeyValuePair<
                          
                            string
                          
                          , PopulationData>>(
                    description: "
                          
                            For each city
                          
                          ",
                    collectionAccessor: context => context.Cities,
                    subPipelineSetup: cityBuilder => cityBuilder
                        .Use<CityPopulationContext, FooCityPipe>()
                        .Use<CityPopulationContext, BarCityPipe>()
                        .Use<CityPopulationContext, BazCityPipe>())
                .Use<ProvincePopulationContext, BazProvincePipe>())
      .Use<StatePopulationContext, BazStatePipe>());
}

                        

终结点处理程序在执行新的Pipeline时,会按照如下的形式将StatePopulationContext上下文构建出来。处理人口数据涉及三个省份(江苏、山东和浙江),每个省份包含三个城市的人口数据.

                          var builder = WebApplication.CreateBuilder(args);
builder.Services.AddPipelines(BuildPipelines);
var app = builder.Build();
app.MapGet("
                          
                            /test
                          
                          ", async (HttpContext httpContext, IPipelineProvider provider, HttpResponse response) => {
    Console.WriteLine("
                          
                            Execute PopulationProcessor pipeline
                          
                          ");
    var data = 
                          
                            new
                          
                           StatePopulationData
    {
        Provinces = 
                          
                            new
                          
                           Dictionary<
                          
                            string
                          
                          , ProvincePopulationData>()
    };
    data.Provinces.Add("
                          
                            Jiangsu
                          
                          ", 
                          
                            new
                          
                           ProvincePopulationData
    {
        Cities = 
                          
                            new
                          
                           Dictionary<
                          
                            string
                          
                          , PopulationData>
        {
            {"
                          
                            Suzhou
                          
                          ", 
                          
                            new
                          
                           PopulationData() },
            {"
                          
                            Wuxi
                          
                          ", 
                          
                            new
                          
                           PopulationData() },
            {"
                          
                            Changezhou
                          
                          ", 
                          
                            new
                          
                           PopulationData() },
        }
    });

    data.Provinces.Add("
                          
                            Shandong
                          
                          ", 
                          
                            new
                          
                           ProvincePopulationData
    {
        Cities = 
                          
                            new
                          
                           Dictionary<
                          
                            string
                          
                          , PopulationData>
        {
            {"
                          
                            Qingdao
                          
                          ", 
                          
                            new
                          
                           PopulationData() },
            {"
                          
                            Jinan
                          
                          ", 
                          
                            new
                          
                           PopulationData() },
            {"
                          
                            Yantai
                          
                          ", 
                          
                            new
                          
                           PopulationData() },
        }
    });

    data.Provinces.Add("
                          
                            Zhejiang
                          
                          ", 
                          
                            new
                          
                           ProvincePopulationData
    {
        Cities = 
                          
                            new
                          
                           Dictionary<
                          
                            string
                          
                          , PopulationData>
        {
            {"
                          
                            Hangzhou
                          
                          ", 
                          
                            new
                          
                           PopulationData() },
            {"
                          
                            Ningbo
                          
                          ", 
                          
                            new
                          
                           PopulationData() },
            {"
                          
                            Wenzhou
                          
                          ", 
                          
                            new
                          
                           PopulationData() },
        }
    });

    var context = 
                          
                            new
                          
                           StatePopulationContext(data);

    var pipeline = provider.GetPipeline<StatePopulationContext>("
                          
                            PopulationProcessor
                          
                          ");
    await pipeline.ProcessAsync(context);
    
                          
                            return
                          
                           Results.Ok();
});
app.Run();

                        

应用启动后,我们依然可以从Pipeline导出页面看到整个Pipeline的处理流程.

clip_image006

当我们请求“/test”,Pipeline针对国家人口数据的执行流程体现在控制台输出上.

clip_image008

5、利用扩展方法使Pipeline构建更简洁

Pipeline的构建过程体现了完整的处理流程,所以我们应该构建代码尽可能地简洁,最理想的状态就是让非技术人员也能看懂。Pipelines提供的用于注册Pipe的API均为泛型方法,并且会涉及两到三个必须显式指定的泛型参数,使用起来还不是很方便。不过这个问题可以通过自定义扩展方法来解决.

                          
                            public
                          
                          
                            static
                          
                          
                            class
                          
                           Extensions
{
    
                          
                            public
                          
                          
                            static
                          
                           IPipelineBuilder<StatePopulationContext> UseStatePipe<TPipe>(
                          
                            this
                          
                           IPipelineBuilder<StatePopulationContext> builder)
        where TPipe : Pipe<StatePopulationContext>
        => builder.Use<StatePopulationContext, TPipe>();
    
                          
                            public
                          
                          
                            static
                          
                           IPipelineBuilder<ProvincePopulationContext> UseProvincePipe<TPipe>(
                          
                            this
                          
                           IPipelineBuilder<ProvincePopulationContext> builder)
        where TPipe : Pipe<ProvincePopulationContext>
        => builder.Use<ProvincePopulationContext, TPipe>();
    
                          
                            public
                          
                          
                            static
                          
                           IPipelineBuilder<CityPopulationContext> UseCityPipe<TPipe>(
                          
                            this
                          
                           IPipelineBuilder<CityPopulationContext> builder)
        where TPipe : Pipe<CityPopulationContext>
        => builder.Use<CityPopulationContext, TPipe>();

    
                          
                            public
                          
                          
                            static
                          
                           IPipelineBuilder<StatePopulationContext> ForEachProvince(
                          
                            this
                          
                           IPipelineBuilder<StatePopulationContext> builder, Action<IPipelineBuilder<ProvincePopulationContext>> setup)
        => builder.ForEach("
                          
                            For each province
                          
                          ", it => it.PopulationData.Provinces, (_, _) => 
                          
                            true
                          
                          , setup);
    
                          
                            public
                          
                          
                            static
                          
                           IPipelineBuilder<ProvincePopulationContext> ForEachCity(
                          
                            this
                          
                           IPipelineBuilder<ProvincePopulationContext> builder, Action<IPipelineBuilder<CityPopulationContext>> setup)
        => builder.ForEach("
                          
                            For each city
                          
                          ", it => it.Cities, (_, _) => 
                          
                            true
                          
                          , setup);
}

                        

如上面的代码片段所示,我们针对三个数据层次(国家、省份、城市)定义了注册对应Pipe的扩展方法UseStatePipe、UseProvincePipe和UseCityPipe。还分别定义了ForEachProvince和ForEachCity这两个扩展方法来注册构建处理省份/城市人口数据的子Pipeline。有了这5个扩展方法,构建整个Pipeline的代码就可以变得非常简单而清晰,即使不写任何的注释,相信每个人(包括非开发人员)都能读懂涉及的处理流程.

                          
                            static
                          
                          
                            void
                          
                           BuildPipelines(IPipelineProvider pipelineProvider)
{
    pipelineProvider.AddPipeline<StatePopulationContext>(name: "
                          
                            PopulationProcessor
                          
                          ", setup: builder => builder
      .UseStatePipe<FooStatePipe>()
      .UseStatePipe<BarStatePipe>()
      .ForEachProvince(provinceBuilder => provinceBuilder
          .UseProvincePipe<FooProvincePipe>()
          .UseProvincePipe<BarProvincePipe>()
          .ForEachCity(cityBuilder => cityBuilder
              .UseCityPipe<FooCityPipe>()
              .UseCityPipe<BarCityPipe>()
              .UseCityPipe<BazCityPipe>())
          .UseProvincePipe<BazProvincePipe>())
      .UseStatePipe<BazStatePipe>());
}

                        

最后此篇关于以管道的方式来完成复杂的流程处理的文章就讲到这里了,如果你想了解更多关于以管道的方式来完成复杂的流程处理的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com