gpt4 book ai didi

c# - WCF 双工中的 TPL 数据流 block

转载 作者:太空宇宙 更新时间:2023-11-03 13:45:22 34 4
gpt4 key购买 nike

我是 SO 的新作者,请多多包涵。

我有一个带有双工服务契约(Contract)的 WCF 服务。该服务契约(Contract)有一个操作联系人,假设要进行长时间的数据处理。我不得不将并发数据处理的数量限制为最多 3 个。我的问题是,在数据处理之后,我需要返回到相同的服务实例上下文,因此我回调传递数据处理结果的发起端点。我需要指出,由于各种原因,我只能使用 TPL 数据流和 WCF 双工。

这是我到目前为止所写内容的演示

在控制台库中我模拟 WCF 调用

class Program
{
static void Main(string[] args)
{
// simulate service calls

Enumerable.Range(0, 5).ToList().ForEach(x =>
{
new System.Threading.Thread(new ThreadStart(async () =>
{
var service = new Service();
await service.Inc(x);
})).Start();
});
}
}

这应该是 WCF 服务

// service contract
public class Service
{
static TransformBlock<Message<int>, Message<int>> transformBlock;

static Service()
{
transformBlock = new TransformBlock<Message<int>, Message<int>>(x => Inc(x), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 3
});
}

static Message<int> Inc(Message<int> input)
{
System.Threading.Thread.Sleep(100);

return new Message<int> { Token = input.Token, Data = input.Data + 1 };
}

// operation contract
public async Task Inc(int id)
{
var token = Guid.NewGuid().ToString();

transformBlock.Post(new Message<int> { Token = token, Data = id });

while (await transformBlock.OutputAvailableAsync())
{
Message<int> message;
if (transformBlock.TryReceive(m => m.Token == token, out message))
{
// do further processing using initiator service instance members
// something like Callback.IncResult(m.Data);
break;
}
}
}
}

public class Message<T>
{
public string Token { get; set; }

public T Data { get; set; }
}

操作契约(Contract)并不是异步所必需的,但我需要 OutputAvailableAsync 通知。

这是一个好的方法还是有更好的解决方案适合我的场景?

提前致谢。

最佳答案

首先,我认为您不应该像现在这样使用 token 。在进程之间进行通信时,唯一标识符很有用。但是,当您在单个进程中时,只需使用引用相等即可。

要真正回答您的问题,我认为(某种)繁忙的循环不是一个好主意。

一个更简单的异步节流解决方案是使用 SemaphoreSlim .像这样的东西:

static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(3);

// operation contract
public async Task Inc(int id)
{
await Semaphore.WaitAsync();

try
{
Thread.Sleep(100);
var result = id + 1;
// do further processing using initiator service instance members
// something like Callback.IncResult(result);
}
finally
{
Semaphore.Release();
}
}

如果你真的想(或必须?)使用数据流,你可以使用 TaskCompletionSource用于操作和 block 之间的同步。操作方法将等待 TaskCompletionSourceTask 并且 block 将在完成对该消息的计算时设置它:

private static readonly ActionBlock<Message<int>> Block =
new ActionBlock<Message<int>>(
x => Inc(x),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 3
});

static void Inc(Message<int> input)
{
Thread.Sleep(100);

input.TCS.SetResult(input.Data + 1);
}

// operation contract
public async Task Inc(int id)
{
var tcs = new TaskCompletionSource<int>();

Block.Post(new Message<int> { TCS = tcs, Data = id });

int result = await tcs.Task;
// do further processing using initiator service instance members
// something like Callback.IncResult(result);
}

关于c# - WCF 双工中的 TPL 数据流 block ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15547607/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com