gpt4 book ai didi

c# - 使用 TPL Dataflow 封装以操作 block 结尾的管道

转载 作者:太空狗 更新时间:2023-10-29 21:33:06 26 4
gpt4 key购买 nike

TPL Dataflow 提供了一个非常有用的功能:

public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
ITargetBlock<TInput> target,
ISourceBlock<TOutput> source)

使您能够将多个 block 封装到一个转换 block 中。它返回一个

IPropagatorBlock<TInput, TOutput>

代表管道的开始和结束 block 。

但是,如果我的管道中的最后一个 block 是 ActionBlock,我不能使用它,因为 ActionBlock 不是 SourceBlock,函数的返回类型将是 ITargetBlock,而不是 IPropagatorBlock。

本质上,我要找的是类似这个函数的东西:

public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
ITargetBlock<TStart> startBlock,
ActionBlock<TEnd> endBlock)

这样写是明智的,还是我遗漏了一些简单的东西?我不太确定如何 编写它 - 特别是连接完成。我需要创建自己的自定义 block 类型吗?

编辑:

好的,所以在阅读了@Panagiotis Kanavos 的回复并做了一些修补之后,我想出了这个。这是基于 EncapsulatingPropagator 类,这是现有 DataflowBlock.Encapsulate 方法所使用的:

internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
private readonly ITargetBlock<TStart> startBlock;

private readonly ActionBlock<TEnd> endBlock;

public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
{
this.startBlock = startBlock;
this.endBlock = endBlock;
}

public Task Completion
{
get { return this.endBlock.Completion; }
}

public void Complete()
{
this.startBlock.Complete();
}

void IDataflowBlock.Fault(Exception exception)
{
if (exception == null)
{
throw new ArgumentNullException("exception");
}

this.startBlock.Fault(exception);
}

public DataflowMessageStatus OfferMessage(
DataflowMessageHeader messageHeader,
TStart messageValue,
ISourceBlock<TStart> source,
bool consumeToAccept)
{
return this.startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
}

最佳答案

Encapsulate 不用于抽象现有管道,它用于创建一个传播器 block ,该 block 需要使用现有 block 和链接无法实现的自定义行为。

例如,Sliding Window示例缓冲发布到其输入 block 的所有传入消息,并在滑动窗口到期时将一批所有检索到的消息输出到其输出 block 。

方法的名称会造成很多混淆,但当您理解它们的用途时,它们确实有意义:

  • target 参数是目标(输入)端点,前面 block 将连接到该端点以发送消息。在这种情况下,处理传入消息并决定是否发布到输出(源) block 的 ActionBlock 是有意义的。
  • source 参数是源(输出)端点,成功 步骤将连接到该端点以接收消息。使用 ActionBlock 作为源没有意义,因为它没有任何输出。

接受 ActionBlock 方法作为 sourceEncapsulate 变体没有用,因为您可以简单地从任何前面的步骤链接到操作 block 。

编辑

如果你想模块化一个管道,即将它分解成可重用的、更易于管理的,你可以创建一个构造的类,你可以使用一个普通的旧类。在该类中,您像往常一样构建管道片段,链接 block (确保传播完成),然后将第一步和最后一步的完成任务公开为公共(public)属性,例如:

class MyFragment
{
public TransformationBlock<SomeMessage,SomeOther> Input {get;}

public Task Completion {get;}

ActionBlock<SomeOther> _finalBlock;

public MyFragment()
{
Input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
_finalBlock=new ActionBlock<SomeOther>(MyMethod);
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
Input.LinkTo(_finalBlock,linkOptions);
}

private SomeOther MyFunction(SomeMessage msg)
{
...
}

private void MyMethod(SomeOther msg)
{
...
}
}

要将片段连接到管道,您只需要从管道 block 链接到暴露的Input block 。要等待完成,只需等待公开的 Completion 任务即可。

如果你愿意,你可以停在这里,或者你可以实现ITargetBlock使片段看起来像一个目标 block 。您只需将所有方法委托(delegate)给 Input block ,将 Completion 属性委托(delegate)给 final block 。

例如:

class MyFragment:ITargetBlock<SomeMessage> 
{
....

public Task Completion {get;}

public void Complete()
{
Input.Complete()
};

public void Fault(Exception exc)
{
Input.Fault(exc);
}

DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
TInput messageValue,ISourceBlock<TInput> source,bool consumeToAccept)
{
return Input.OfferMessage(messageHeader,messageValue,source,consumeToAccept);
}
}

编辑 2

使用@bornfromanegg 的类 1 可以将构建片段的行为与公开输入和完成的样板分开:

public ITargetBlock<SomeMessage> BuildMyFragment()
{
var input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
var step2=new TransformationBlock<SomeOther,SomeFinal>(MyFunction2);
var finalBlock=new ActionBlock<SomeFinal>(MyMethod);

var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}

input.LinkTo(step2,linkOptions);
step2.LinkTo(finalBlock,linkOptions);

return new EncapsulatingTarget(input,finalBlock);
}

关于c# - 使用 TPL Dataflow 封装以操作 block 结尾的管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32459880/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com