gpt4 book ai didi

.net - gRPC 推送和扇出

转载 作者:行者123 更新时间:2023-12-02 02:19:18 24 4
gpt4 key购买 nike

是否有可能使用 gRPC 作为具有扇出功能的推送服务?在Google给出的示例中,服务器端(C#)有以下代码:

    public override async Task ListFeatures(Rectangle request, IServerStreamWriter<Feature> responseStream, ServerCallContext context)
{
var responses = features.FindAll( (feature) => feature.Exists() && request.Contains(feature.Location) );
foreach (var response in responses)
{
await responseStream.WriteAsync(response);
}
}

问题在于:

  1. 只有在客户端明确要求时才会生成和写入数据。
  2. 只有发出请求的客户端才会获得新数据。

我认为我需要的是:

  1. 保留请求(订阅)的每个客户端的所有 IServerStreamWriter。
  2. 当新数据可用时,通过外部事件触发写入。
  3. 写入所有streamWriter

编辑:根据卡尔的建议,我现在有以下建议:

原型(prototype):

service PubSub {
rpc Subscribe(Subscription) returns (stream Event) {}
rpc Unsubscribe(Subscription) returns (Unsubscription) {}
}

message Event
{
string Value = 1;
}
message Subscription
{
string Id = 1;
}
message Unsubscription
{
string Id = 1;
}

PubSubImpl:

public class PubSubImpl : PubSub.PubSubBase
{
private readonly BufferBlock<Event> _buffer = new BufferBlock<Event>();

private Dictionary<string, IServerStreamWriter<Event>> _subscriberWritersMap =
new Dictionary<string, IServerStreamWriter<Event>>();

public override async Task Subscribe(Subscription subscription, IServerStreamWriter<Event> responseStream, ServerCallContext context)
{
//Dict to hold a streamWriter for each subscriber.
_subscriberWritersMap[subscription.Id] = responseStream;

while (_subscriberWritersMap.ContainsKey(subscription.Id))
{
//Wait on BufferBlock from MS Dataflow package.
var @event = await _buffer.ReceiveAsync();
foreach (var serverStreamWriter in _subscriberWritersMap.Values)
{
await serverStreamWriter.WriteAsync(@event);
}
}
}

public override Task<Unsubscription> Unsubscribe(Subscription request, ServerCallContext context)
{
_subscriberWritersMap.Remove(request.Id);
return Task.FromResult(new Unsubscription() { Id = request.Id });
}

public void Publish(string input)
{
_buffer.Post(new Event() { Value = input });
}
}

现在可以像这样发送“Push”:

   while ((input = Console.ReadLine()) != "q")
{
pubsubImp.Publish(input);
}

在客户端我有:

public async Task Subscribe()
{
_subscription = new Subscription() { Id = Guid.NewGuid().ToString() };
using (var call = _pubSubClient.Subscribe(_subscription))
{
//Receive
var responseReaderTask = Task.Run(async () =>
{
while (await call.ResponseStream.MoveNext())
{
Console.WriteLine("Event received: " + call.ResponseStream.Current);
}
});

await responseReaderTask;
}
}

public void Unsubscribe()
{
_pubSubClient.Unsubscribe(_subscription);
}

Client-Main 的工作方式如下:

static void Main(string[] args)
{
var channel = new Channel("127.0.0.1:50052",
ChannelCredentials.Insecure);
var subscriber = new Subsriber(new PubSub.PubSubClient(channel));

Task.Run(async () =>
{
await subscriber.Subscribe();
}).GetAwaiter();

Console.WriteLine("Hit key to unsubscribe");
Console.ReadLine();

subscriber.Unsubscribe();

Console.WriteLine("Unsubscribed...");

Console.WriteLine("Hit key to exit...");
Console.ReadLine();

}

目前看来它可以工作。这是应该/可以这样做的吗?测试解决方案可以在以下位置找到: https://github.com/KingKnecht/gRPC-PubSub

最佳答案

我认为您不需要在服务器应用程序中跟踪客户端。相反,对于每个“推送”方法,创建一个 EventWaitHandle 并持续等待它。然后在另一个上下文中,向 EventWaitHandle 发出信号,以便等待的“push”方法可以 WriteAsync 发送给客户端。

例如

public class PubSubImpl : PubSub.PubSubBase
{
EventWaitHandle evwStatus = new EventWaitHandle(false, EventResetMode.ManualReset);
string status;

public override async Task Subscribe(Subscription subscription, IServerStreamWriter<Event> responseStream, ServerCallContext context)
{
// respond with the current status
await serverStreamWriter.WriteAsync(new Event { Value = status });
// wait until we're signaled with a different status
while(evwStatus.WaitOne())
{
await serverStreamWriter.WriteAsync(new Event { Value = status });
}
}

public void Publish(string input)
{
status = input;
// let the waiting threads respond
evwStatus.Set();
// halt the waiting threads
evwStatus.Reset();
}
}

关于.net - gRPC 推送和扇出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45107411/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com