gpt4 book ai didi

rust - Actix SyncArbiter 注册表

转载 作者:行者123 更新时间:2023-11-29 08:29:05 26 4
gpt4 key购买 nike

我正在尝试使用 SyncArbiter 实现一个包含 10 个 Redis 连接的池,供不同的参与者使用。假设我们有一个名为 Bob 的 actor,它必须使用 Redis actor 来完成它的任务。

虽然这可以通过以下方式实现:

// crate, use and mod statements have been omitted to lessen clutter

/// FILE main.rs
pub struct AppState {
pub redis: Addr<Redis>,
pub bob: Addr<Bob>
}

fn main() {
let system = actix::System::new("theatre");

server::new(move || {
let redis_addr = SyncArbiter::start(10, || Redis::new("redis://127.0.0.1").unwrap());
let bob_addr = SyncArbiter::start(10, || Bob::new());

let state = AppState {
redis: redis_addr,
bob: bob_addr
};

App::with_state(state).resource("/bob/eat", |r| {
r.method(http::Method::POST)
.with_async(controllers::bob::eat)
})
})
.bind("0.0.0.0:8080")
.unwrap()
.start();

println!("Server started.");

system.run();
}

/// FILE controllers/bob.rs
pub struct Food {
name: String,
kcal: u64
}

pub fn eat(
(req, state): (Json<Food>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> {
state
.bob
.send(Eat::new(req.into_inner()))
.from_err()
.and_then(|res| match res {
Ok(val) => {
println!("==== BODY ==== {:?}", val);
Ok(HttpResponse::Ok().into())
}
Err(_) => Ok(HttpResponse::InternalServerError().into()),
})
}

/// FILE actors/redis.rs
#[derive(Debug)]
pub struct Redis {
pub client: Client
}

pub struct RunCommand(Cmd);

impl RunCommand {
pub fn new(cmd: Cmd) -> Self {
RunCommand(cmd)
}
}

impl Message for RunCommand {
type Result = Result<RedisResult<String>, ()>;
}

impl Actor for Redis {
type Context = SyncContext<Self>;
}

impl Handler<RunCommand> for Redis {
type Result = Result<RedisResult<String>, ()>;

fn handle(&mut self, msg: RunCommand, _context: &mut Self::Context) -> Self::Result {
println!("Redis received command!");
Ok(Ok("OK".to_string()))
}
}

impl Redis {
pub fn new(url: &str) -> Result<Self, RedisError> {
let client = match Client::open(url) {
Ok(client) => client,
Err(error) => return Err(error)
};

let redis = Redis {
client: client,
};

Ok(redis)
}
}

/// FILE actors/bob.rs
pub struct Bob;

pub struct Eat(Food);

impl Message for Eat {
type Result = Result<Bob, ()>;
}

impl Actor for Eat {
type Context = SyncContext<Self>;
}

impl Handler<Eat> for Bob {
type Result = Result<(), ()>;

fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
println!("Bob received {:?}", &msg);

// How to get a Redis actor and pass data to it here?

Ok(msg.datapoint)
}
}

impl Bob {
pub fn new() -> () {
Bob {}
}
}

从上面 Bob 的 handle 实现来看,Bob 如何获得 Redis actor 的地址还不清楚。或者将任何消息发送到运行在 SyncArbiter 中的任何 Actor

使用常规的 ArbiterRegistry 也可以达到同样的效果,但据我所知,Actix 不允许多个相同的参与者(例如我们无法使用常规 Arbiter 启动 10 个 Redis actor。

将我的问题形式化:

  • 是否有SyncArbiter actors 的Registry
  • 我可以在常规 Arbiter 中启动多个相同类型的参与者吗?
  • 是否有更好/更规范的方式来实现连接池

编辑

版本:

  • actix 0.7.9
  • actix_web 0.7.19
  • future = "0.1.26"
  • 使用rust 1.33.0

最佳答案

我自己找到了答案。

开箱即用,无法从注册表中检索具有 SyncContextActor

鉴于我上面的例子。 Actor Bob 要向 Redis Actor 发送任何类型的消息,它需要知道 Redis Actor 的地址。 Bob 可以显式获取 Redis 的地址 - 包含在发送给它的消息中或从某种共享状态中读取。

我想要一个类似于 Erlang 的系统,所以我决定不通过消息传递 actor 的地址,因为它看起来太费力,容易出错,而且在我看来,它违背了拥有基于 actor 的并发模型的目的(因为没有一个 actor 可以向任何其他 Actor 发送消息)。

因此,我研究了共享状态的想法,并决定实现我自己的 SyncRegistry,它类似于 Actix 标准 Registry - 这正是我所做的想要但不适合具有 SyncContext 的 Actor。

这是我编写的天真的解决方案:https://gist.github.com/monorkin/c463f34764ab23af2fd0fb0c19716177

使用以下设置:

fn main() {
let system = actix::System::new("theatre");

let addr = SyncArbiter::start(10, || Redis::new("redis://redis").unwrap());
SyncRegistry::set(addr);
let addr = SyncArbiter::start(10, || Bob::new());
SyncRegistry::set(addr);


server::new(move || {
let state = AppState {};

App::with_state(state).resource("/foo", |r| {
r.method(http::Method::POST)
.with_async(controllers::foo::create)
})
})
.bind("0.0.0.0:8080")
.unwrap()
.start();

println!("Server started.");

system.run();
}

Actor Bob 可以通过以下方式从程序中的任何位置获取 Redis 的地址:

impl Handler<Eat> for Bob {
type Result = Result<(), ()>;

fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
let redis = match SyncRegistry::<Redis>::get() {
Some(redis) => redis,
_ => return Err(())
};

let cmd = redis::cmd("XADD")
.arg("things_to_eat")
.arg("*")
.arg("data")
.arg(&msg.0)
.to_owned();

redis.clone().lock().unwrap().send(RunCommand::new(cmd)).wait().unwrap();
}
}

关于rust - Actix SyncArbiter 注册表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55559540/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com