gpt4 book ai didi

c# - DataflowBlock ITargetSource.AsObservable() 不触发 OnNext()

转载 作者:行者123 更新时间:2023-12-04 15:19:34 27 4
gpt4 key购买 nike

我正在尝试使用数据流 block ,我需要监视通过的项目以进行单元测试。

为了做到这一点,我使用了 AsObservable() ISourceBlock<T> 上的方法我的TransformBlock<Tinput, T> ,这样我就可以在执行后检查管道的每个 block 是否都生成了预期值。

流水​​线

{
...
var observer = new MyObserver<string>();
_block = new TransformManyBlock<string, string>(MyHandler, options);
_block.LinkTo(_nextBlock);
_block.AsObservable().Subscribe(observer);
_block.Post("Test");
...
}

我的观察者

public class MyObserver<T> : IObserver<T>
{
public List<Exception> Errors = new List<Exception>();
public bool IsComplete = false;
public List<T> Values = new List<T>();

public void OnCompleted()
{
IsComplete = true;
}

public void OnNext(T value)
{
Values.Add(value);
}

public void OnError(Exception e)
{
Errors.Add(e);
}
}

所以基本上我将我的观察者订阅到转换 block ,并且我希望通过的每个值都在我的观察者“值”列表中注册。

但是,虽然 IsComplete设置为真,OnError()成功注册异常,OnNext()方法永远不会被调用,除非它是管道的最后一个 block ......我想不通为什么,因为链接到这个 sourceBlock 的“nextblock”成功接收到数据,证明一些数据正在退出这个 block 。

据我了解,AsObservable应该报告退出 block 的每个值,而不仅仅是其他链接 block 未使用的值...

我做错了什么?

最佳答案

在您有机会阅读它们之前,_nextBlock 正在消费您的消息。

如果您注释掉这一行 _block.LinkTo(_nextBlock); 它可能会起作用。

AsObservable 的唯一目的就是让 blockRX 中被消费。它不会更改 block 的内部工作以广播消息到多个目标。您需要为该 BroadcastBlock

准备一个特殊的 block

我建议广播到另一个 block 并使用它来订阅

BroadcastBlock’s mission in life is to enable all targets linked fromthe block to get a copy of every element published

var options = new DataflowLinkOptions {PropagateCompletion = true};


var broadcastBlock = new BroadcastBlock<string>(x => x);
var bufferBlock = new BufferBlock<string>();
var actionBlock = new ActionBlock<string>(s => Console.WriteLine("Action " + s));

broadcastBlock.LinkTo(bufferBlock, options);
broadcastBlock.LinkTo(actionBlock, options);

bufferBlock.AsObservable().Subscribe(s => Console.WriteLine("peek " + s));

for (var i = 0; i < 5; i++)
await broadcastBlock.SendAsync(i.ToString());

broadcastBlock.Complete();
await actionBlock.Completion;

输出

peek 0
Action 0
Action 1
Action 2
Action 3
Action 4
peek 1
peek 2
peek 3
peek 4

关于c# - DataflowBlock ITargetSource.AsObservable() 不触发 OnNext(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63630681/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com