gpt4 book ai didi

c# - Akka.net - 如何等待子 Actor 在停止之前处理所有未决消息

转载 作者:行者123 更新时间:2023-12-05 01:55:18 25 4
gpt4 key购买 nike

我们有一个名为 A 的集群分片 actor,它有多个子 actor,这些子 actor 使用每个实体的子模型创建,如下所示。当我们将 100 条消息从 actor B 发送到 D,并且 actor D 需要 500 毫秒来处理每条消息时,与此同时,当我们使用 Context.Parent.Tell (new Passivate (PoisonPill.Instance) 将毒丸发送给 actor A 时));它会立即停止所有子 actor,包括 actor D,而不处理未决消息。

    A
|
B
/ \
C D

有没有办法等待actor D处理完所有的消息?

最佳答案

https://stackoverflow.com/a/70286526/377476是一个好的开始;您将需要一条自定义关机消息。当父 actor 终止时,它的子 actor 会通过 /system 消息自动终止,这些消息会取代其队列中任何未处理的 /user 消息。

所以你需要做的是确保他们所有的 /user 消息在父进程自行终止之前得到处理。有一种直接的方法可以使用 GracefulStop 扩展方法结合您的自定义停止消息来执行此操作:

public sealed class ActorA : ReceiveActor{
private IActorRef _actorB;

private readonly ILoggingAdapter _log = Context.GetLogger();

public ActorA(){
Receive<StartWork>(w => {
foreach(var i in Enumerable.Range(0, w.WorkCount)){
_actorB.Tell(i);
}
});

ReceiveAsync<MyStopMessage>(async _ => {
_log.Info("Begin shutdown");

// stop child actor B with the same custom message
await _actorB.GracefulStop(TimeSpan.FromSeconds(10), _);

// shut ourselves down after child is done
Context.Stop(Self);
});
}

protected override void PreStart(){
_actorB = Context.ActorOf(Props.Create(() => new ActorB()), "b");
}
}

public sealed class ActorB : ReceiveActor{
private IActorRef _actorC;
private IActorRef _actorD;

private readonly ILoggingAdapter _log = Context.GetLogger();

public ActorB(){
Receive<int>(i => {
_actorC.Tell(i);
_actorD.Tell(i);
});

ReceiveAsync<MyStopMessage>(async _ => {

_log.Info("Begin shutdown");

// stop both actors in parallel
var stopC = _actorC.GracefulStop(TimeSpan.FromSeconds(10));
var stopD = _actorD.GracefulStop(TimeSpan.FromSeconds(10));

// compose stop Tasks
var bothStopped = Task.WhenAll(stopC, stopD);
await bothStopped;

// shut ourselves down immediately
Context.Stop(Self);
});
}

protected override void PreStart(){
var workerProps = Props.Create(() => new WorkerActor());
_actorC = Context.ActorOf(workerProps, "c");
_actorD = Context.ActorOf(workerProps, "d");
}
}

public sealed class WorkerActor : ReceiveActor {
private readonly ILoggingAdapter _log = Context.GetLogger();

public WorkerActor(){
ReceiveAsync<int>(async i => {
await Task.Delay(10);
_log.Info("Received {0}", i);
});
}
}

我在这里创建了这个示例的可运行版本:https://dotnetfiddle.net/xiGyWM - 您会看到 MyStopMessage 是在示例开始后不久收到的, C 和 D 被赋予工作之后。在这种情况下,所有这些工作都在任何参与者终止之前完成。

关于c# - Akka.net - 如何等待子 Actor 在停止之前处理所有未决消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70285419/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com