gpt4 book ai didi

.net - TPL 数据流链接到多个消费者不起作用

转载 作者:行者123 更新时间:2023-12-04 17:11:12 26 4
gpt4 key购买 nike

我有一个 BufferBlock 来发布消息:

public class DelimitedFileBlock : ISourceBlock<string>
{
private ISourceBlock<string> _source;
_source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 });

//Read a file
While(!eof)
row = read one row
//if consumers are slow, then sleep for a while
while(!(_source as BufferBlock<string>).Post<string>(row))
{
Thread.Sleep(5000);
}
}

这是一个 5GB 的文件,有 2400 万行。

我现在有一个使用 ActionBlock 的 Target block :
public class SolaceTargetBlock : ITargetBlock<string>
private ActionBlock<IBasicDataContract> _publishToSolaceBlock;

public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept)
{
//post to another block to publish
bool success = _publishToSolaceBlock.Post(messageValue);

现在在控制台应用程序中,我指定:
 SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam", 
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });

DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo);

我将有限容量保持为 1 仅用于测试。

现在我使用 LinkTo 将这三个消费者链接到我的源:
 delimitedFileBlock.LinkTo(solaceTargetBlock1);      
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);

这在 10003 行之后转到 Thread.Sleep(5000) 语句,并且 while 循环中的 Post 始终返回 false。

我期待,因为我有 LinkTo,所以完成后 solaceTargetBlocks 将能够选择下一条消息,但 LinkTo 不会清除 BufferBlock。那么,如何在多个消费者之间进行负载平衡。我是否必须接收并编写一个简单的负载平衡逻辑才能在消费者之间分配?

最佳答案

来自 Post method 的文档在 DataflowBlock<T> class (强调我的):

This method will return once the target block has decided to accept or decline the item,



这意味着目标可以选择拒绝阻止(这是您看到的行为)。

此外,它指出:

For target blocks that support postponing offered messages, or for blocks that may do more processing in their Post implementation, consider using SendAsync, which will return immediately and will enable the target to postpone the posted message and later consume it after SendAsync returns.



这意味着您可能会获得更好的结果(取决于目标 block ),因为您的消息可能会被推迟,但仍会被处理,而不是完全拒绝。

我想 BoundedCapacity property BufferBlock<T> 上的设置和三个 ActionBlock<TInput> 实例与您所看到的有关:
  • BufferBlock<T> 上的最大缓冲区是 10000;一旦您将 10,000 个项目放入队列中,它将拒绝其余的(参见上面的第二个引用),因为它无法处理它们( SendAsync 在这里也不起作用,因为它无法缓冲要推迟的消息)。
  • ActionBlock<TInput> 上的最大缓冲区实例是 1,你有三个。

  • 10,000 + (1 * 3) = 10,000 + 3 = 10,003

    要解决这个问题,您需要做一些事情。

    首先,需要为 MaxDegreeOfParallelism property设置一个更合理的值。 ExecutionDataFlowBlockOptions 创建 ActionBlock<TInput> 时实例。

    默认情况下, MaxDegreeOfParallelism对于 ActionBlock<TInput>设置为 1;这保证了调用将被序列化,您不必担心线程安全。如果你想要 ActionBlock<T>关心线程安全,然后保持这个设置。

    如果 ActionBlock<TInput>是线程安全的,那么你没有理由限制它,你应该设置 MaxDegreeOfParallelism DataflowBlockOptions.Unbounded .

    如果您正在访问 ActionBlock<TInput> 中的某种共享资源,则很有可能。可以在有限的基础上同时访问,那么你可能做错了。

    如果您有某种共享资源,那么您可能应该通过另一个 block 运行它并设置 MaxDegreeOfParallelism在那。

    其次,如果您关心吞吐量并且可以接受丢弃的项目,那么您应该设置 BoundedCapacity属性(property)。

    另请注意,您表示“如果消费者很慢,请睡一会儿”;如果你正确地连接你的 block ,就没有理由这样做,你应该让数据流过并且只在你需要的地方放置限制。你的生产者不应该负责限制消费者,让消费者负责限制。

    最后,您的代码看起来不需要自己实现数据流 block 接口(interface)。你可以像这样构造它:
    // The source, your read lines will be posted here.
    var delimitedFileBlock = new BufferBlock<string>();

    // The Action for the action blocks.
    Action<string> action =
    s => { /* Do something with the string here. */ };

    // Create the action blocks, assuming that
    // action is thread-safe, no need to have it process one at a time
    // or to bound the capacity.
    var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
    var solaceActionBlock2 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
    var solaceActionBlock3 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

    // Link everything.
    delimitedFileBlock.LinkTo(solaceTargetBlock1);
    delimitedFileBlock.LinkTo(solaceTargetBlock2);
    delimitedFileBlock.LinkTo(solaceTargetBlock3);

    // Now read the file, and post to the BufferBlock<T>:
    // Note: This is pseudo-code.
    while (!eof)
    {
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
    }

    另请注意,有三个 ActionBlock<TInput>实例是不必要的,除非您需要将输出过滤到不同的操作(您在这里没有这样做),所以上面确实减少了这一点(假设您的操作是线程安全的,所以您将增加 MaxDegreeOfParallelismUnbounded 无论如何):
    // The source, your read lines will be posted here.
    var delimitedFileBlock = new BufferBlock<string>();

    // The Action for the action blocks.
    Action<string> action =
    s => { /* Do something with the string here. */ };

    // Create the action blocks, assuming that
    // action is thread-safe, no need to have it process one at a time
    // or to bound the capacity.
    var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

    // Link everything.
    delimitedFileBlock.LinkTo(solaceTargetBlock);

    // Now read the file, and post to the BufferBlock<T>:
    // Note: This is pseudo-code.
    while (!eof)
    {
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
    }

    关于.net - TPL 数据流链接到多个消费者不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12509039/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com