Реестр Actix SyncArbiter
Я пытаюсь реализовать пул из 10 Redis соединений с помощью SyncArbiter
для разных актеров, чтобы использовать. Скажем, у нас есть актер по имени Боб, который должен использовать актера Redis для выполнения своей задачи.
Пока это достижимо следующим образом:
// crate, use and mod statements have been omitted to lessen clutter
/// FILE main.rs
pub struct AppState {
pub redis: Addr<Redis>,
pub bob: Addr<Bob>
}
fn main() {
let system = actix::System::new("theatre");
server::new(move || {
let redis_addr = SyncArbiter::start(10, || Redis::new("redis://127.0.0.1").unwrap());
let bob_addr = SyncArbiter::start(10, || Bob::new());
let state = AppState {
redis: redis_addr,
bob: bob_addr
};
App::with_state(state).resource("/bob/eat", |r| {
r.method(http::Method::POST)
.with_async(controllers::bob::eat)
})
})
.bind("0.0.0.0:8080")
.unwrap()
.start();
println!("Server started.");
system.run();
}
/// FILE controllers/bob.rs
pub struct Food {
name: String,
kcal: u64
}
pub fn eat(
(req, state): (Json<Food>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> {
state
.bob
.send(Eat::new(req.into_inner()))
.from_err()
.and_then(|res| match res {
Ok(val) => {
println!("==== BODY ==== {:?}", val);
Ok(HttpResponse::Ok().into())
}
Err(_) => Ok(HttpResponse::InternalServerError().into()),
})
}
/// FILE actors/redis.rs
#[derive(Debug)]
pub struct Redis {
pub client: Client
}
pub struct RunCommand(Cmd);
impl RunCommand {
pub fn new(cmd: Cmd) -> Self {
RunCommand(cmd)
}
}
impl Message for RunCommand {
type Result = Result<RedisResult<String>, ()>;
}
impl Actor for Redis {
type Context = SyncContext<Self>;
}
impl Handler<RunCommand> for Redis {
type Result = Result<RedisResult<String>, ()>;
fn handle(&mut self, msg: RunCommand, _context: &mut Self::Context) -> Self::Result {
println!("Redis received command!");
Ok(Ok("OK".to_string()))
}
}
impl Redis {
pub fn new(url: &str) -> Result<Self, RedisError> {
let client = match Client::open(url) {
Ok(client) => client,
Err(error) => return Err(error)
};
let redis = Redis {
client: client,
};
Ok(redis)
}
}
/// FILE actors/bob.rs
pub struct Bob;
pub struct Eat(Food);
impl Message for Eat {
type Result = Result<Bob, ()>;
}
impl Actor for Eat {
type Context = SyncContext<Self>;
}
impl Handler<Eat> for Bob {
type Result = Result<(), ()>;
fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
println!("Bob received {:?}", &msg);
// How to get a Redis actor and pass data to it here?
Ok(msg.datapoint)
}
}
impl Bob {
pub fn new() -> () {
Bob {}
}
}
Из описанной выше реализации описателя в Бобе неясно, как Боб мог получить адрес актера Redis. Или отправьте любое сообщение любому Actor
работает в SyncArbiter
,
То же самое может быть достигнуто с помощью регулярного Arbiter
и Registry
но, насколько мне известно, Actix не позволяет использовать несколько одинаковых актеров (например, мы не можем запустить 10 актеров Redis с использованием обычного Arbiter
).
Чтобы формализовать мои вопросы:
- Есть ли
Registry
заSyncArbiter
актеры - Могу ли я запустить несколько актеров одного типа в обычном
Arbiter
- Есть ли лучший / более канонический способ реализации пула соединений
РЕДАКТИРОВАТЬ
Версии:
- Актикс 0.7.9
- actix_web 0.7.19
- фьючерсы = "0.1.26"
- ржавчина 1.33.0
1 ответ
Я нашел ответ сам.
Из коробки нет возможности для Actor
с SyncContext
быть извлеченным из реестра.
Учитывая мой приведенный выше пример. Для актера Bob
отправить любое сообщение Redis
актер должен знать адрес Redis
актер. Bob
можно получить адрес Redis
явно - содержится в сообщении, отправленном ему или прочитанном из какого-то общего состояния.
Я хотел создать систему, аналогичную системе Эрланга, поэтому я решил не передавать адреса актеров через сообщения, так как она казалась слишком трудоемкой, подверженной ошибкам, и, на мой взгляд, она противоречит цели иметь модель параллелизма на основе акторов (поскольку ни один актер не может отправлять сообщения другим актер).
Поэтому я исследовал идею общего государства и решил реализовать свой собственный SyncRegistry
это было бы аналогом стандарта Actix Registry
- который делает именно то, что я хочу, но не для актеров с SyncContext
,
Вот наивное решение, которое я кодировал: https://gist.github.com/monorkin/c463f34764ab23af2fd0fb0c19716177
Со следующей настройкой:
fn main() {
let system = actix::System::new("theatre");
let addr = SyncArbiter::start(10, || Redis::new("redis://redis").unwrap());
SyncRegistry::set(addr);
let addr = SyncArbiter::start(10, || Bob::new());
SyncRegistry::set(addr);
server::new(move || {
let state = AppState {};
App::with_state(state).resource("/foo", |r| {
r.method(http::Method::POST)
.with_async(controllers::foo::create)
})
})
.bind("0.0.0.0:8080")
.unwrap()
.start();
println!("Server started.");
system.run();
}
Актер Bob
можно получить адрес Redis
следующим образом из любой точки программы:
impl Handler<Eat> for Bob {
type Result = Result<(), ()>;
fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
let redis = match SyncRegistry::<Redis>::get() {
Some(redis) => redis,
_ => return Err(())
};
let cmd = redis::cmd("XADD")
.arg("things_to_eat")
.arg("*")
.arg("data")
.arg(&msg.0)
.to_owned();
redis.clone().lock().unwrap().send(RunCommand::new(cmd)).wait().unwrap();
}
}