diff --git a/rust/chg/src/lib.rs b/rust/chg/src/lib.rs --- a/rust/chg/src/lib.rs +++ b/rust/chg/src/lib.rs @@ -5,7 +5,7 @@ mod attachio; mod clientext; -//pub mod locator; +pub mod locator; pub mod message; pub mod procutil; mod runcommand; diff --git a/rust/chg/src/locator.rs b/rust/chg/src/locator.rs --- a/rust/chg/src/locator.rs +++ b/rust/chg/src/locator.rs @@ -5,7 +5,6 @@ //! Utility for locating command-server process. -use futures::future::{self, Either, Loop}; use log::debug; use std::env; use std::ffi::{OsStr, OsString}; @@ -14,14 +13,11 @@ use std::os::unix::ffi::{OsStrExt, OsStringExt}; use std::os::unix::fs::{DirBuilderExt, MetadataExt}; use std::path::{Path, PathBuf}; -use std::process; -use std::time::Duration; -use tokio::prelude::*; -use tokio::process::{Child, Command}; +use std::process::{self, Child, Command}; +use std::time::{Duration, Instant}; use tokio::time; -use tokio_hglib::UnixClient; -use crate::clientext::ChgClientExt; +use crate::clientext::ChgClient; use crate::message::{Instruction, ServerSpec}; use crate::procutil; @@ -82,21 +78,19 @@ /// Connects to the server. /// /// The server process will be spawned if not running. - pub fn connect(self) -> impl Future { - future::loop_fn((self, 0), |(loc, cnt)| { - if cnt < 10 { - let fut = loc - .try_connect() - .and_then(|(loc, client)| { - client - .validate(&loc.hg_early_args) - .map(|(client, instructions)| (loc, client, instructions)) - }) - .and_then(move |(loc, client, instructions)| { - loc.run_instructions(client, instructions, cnt) - }); - Either::A(fut) - } else { + pub async fn connect(&mut self) -> io::Result { + for _cnt in 0..10 { + let mut client = self.try_connect().await?; + let instructions = client.validate(&self.hg_early_args).await?; + let reconnect = self.run_instructions(&instructions)?; + if !reconnect { + return Ok(client); + } + } + + // TODO: unindent + { + { let msg = format!( concat!( "too many redirections.\n", @@ -105,20 +99,17 @@ "before executing hg. If you have to use a ", "wrapper, wrap chg instead of hg.", ), - loc.hg_command + self.hg_command ); - Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg))) + Err(io::Error::new(io::ErrorKind::Other, msg)) } - }) + } } /// Runs instructions received from the server. - fn run_instructions( - mut self, - client: UnixClient, - instructions: Vec, - cnt: usize, - ) -> io::Result> { + /// + /// Returns true if the client should try connecting to the other server. + fn run_instructions(&mut self, instructions: &[Instruction]) -> io::Result { let mut reconnect = false; for inst in instructions { debug!("instruction: {:?}", inst); @@ -126,7 +117,7 @@ Instruction::Exit(_) => { // Just returns the current connection to run the // unparsable command and report the error - return Ok(Loop::Break((self, client))); + return Ok(false); } Instruction::Reconnect => { reconnect = true; @@ -139,7 +130,7 @@ ); return Err(io::Error::new(io::ErrorKind::InvalidData, msg)); } - self.redirect_sock_path = Some(path); + self.redirect_sock_path = Some(path.to_owned()); reconnect = true; } Instruction::Unlink(path) => { @@ -155,25 +146,22 @@ } } - if reconnect { - Ok(Loop::Continue((self, cnt + 1))) - } else { - Ok(Loop::Break((self, client))) - } + Ok(reconnect) } /// Tries to connect to the existing server, or spawns new if not running. - fn try_connect(self) -> impl Future { + async fn try_connect(&mut self) -> io::Result { let sock_path = self .redirect_sock_path .as_ref() .unwrap_or(&self.base_sock_path) .clone(); debug!("try connect to {}", sock_path.display()); - UnixClient::connect(sock_path) - .then(|res| { - match res { - Ok(client) => Either::A(future::ok((self, client))), + // TODO: unindent + { + { + let mut client = match ChgClient::connect(sock_path).await { + Ok(client) => client, Err(_) => { // Prevent us from being re-connected to the outdated // master server: We were told by the server to redirect @@ -184,35 +172,23 @@ fs::remove_file(&self.base_sock_path).unwrap_or(()); // may race } - Either::B(self.spawn_connect()) + self.spawn_connect().await? } - } - }) - .and_then(|(loc, client)| { + }; check_server_capabilities(client.server_spec())?; - Ok((loc, client)) - }) - .and_then(|(loc, client)| { // It's purely optional, and the server might not support this command. if client.server_spec().capabilities.contains("setprocname") { - let fut = client - .set_process_name(format!("chg[worker/{}]", loc.process_id)) - .map(|client| (loc, client)); - Either::A(fut) - } else { - Either::B(future::ok((loc, client))) + client + .set_process_name(format!("chg[worker/{}]", self.process_id)) + .await?; } - }) - .and_then(|(loc, client)| { + client.set_current_dir(&self.current_dir).await?; client - .set_current_dir(&loc.current_dir) - .map(|client| (loc, client)) - }) - .and_then(|(loc, client)| { - client - .set_env_vars_os(loc.env_vars.iter().cloned()) - .map(|client| (loc, client)) - }) + .set_env_vars_os(self.env_vars.iter().cloned()) + .await?; + Ok(client) + } + } } /// Spawns new server process and connects to it. @@ -220,10 +196,10 @@ /// The server will be spawned at the current working directory, then /// chdir to "/", so that the server will load configs from the target /// repository. - fn spawn_connect(self) -> impl Future { + async fn spawn_connect(&mut self) -> io::Result { let sock_path = self.temp_sock_path(); debug!("start cmdserver at {}", sock_path.display()); - Command::new(&self.hg_command) + let server = Command::new(&self.hg_command) .arg("serve") .arg("--cmdserver") .arg("chgunix") @@ -236,68 +212,54 @@ .env_clear() .envs(self.env_vars.iter().cloned()) .env("CHGINTERNALMARK", "") - .spawn() - .into_future() - .and_then(|server| self.connect_spawned(server, sock_path)) - .and_then(|(loc, client, sock_path)| { + .spawn()?; + let client = self.connect_spawned(server, &sock_path).await?; + // TODO: unindent + { + { debug!( "rename {} to {}", sock_path.display(), - loc.base_sock_path.display() + self.base_sock_path.display() ); - fs::rename(&sock_path, &loc.base_sock_path)?; - Ok((loc, client)) - }) + fs::rename(&sock_path, &self.base_sock_path)?; + Ok(client) + } + } } /// Tries to connect to the just spawned server repeatedly until timeout /// exceeded. - fn connect_spawned( - self, - server: Child, - sock_path: PathBuf, - ) -> impl Future { + async fn connect_spawned( + &mut self, + mut server: Child, + sock_path: &Path, + ) -> io::Result { debug!("try connect to {} repeatedly", sock_path.display()); - let connect = future::loop_fn(sock_path, |sock_path| { - UnixClient::connect(sock_path.clone()).then(|res| { - match res { - Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))), - Err(_) => { - // try again with slight delay - let fut = time::delay_for(Duration::from_millis(10)) - .map(|()| Loop::Continue(sock_path)) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err)); - Either::B(fut) - } - } - }) - }); - // waits for either connection established or server failed to start - connect - .select2(server) - .map_err(|res| res.split().0) - .timeout(self.timeout) - .map_err(|err| { - err.into_inner().unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::TimedOut, - "timed out while connecting to server", - ) - }) - }) - .and_then(|res| { - match res { - Either::A(((client, sock_path), server)) => { - server.forget(); // continue to run in background - Ok((self, client, sock_path)) - } - Either::B((st, _)) => Err(io::Error::new( - io::ErrorKind::Other, - format!("server exited too early: {}", st), - )), - } - }) + let start_time = Instant::now(); + while start_time.elapsed() < self.timeout { + if let Ok(client) = ChgClient::connect(&sock_path).await { + // server handle is dropped here, but the detached process + // will continue running in background + return Ok(client); + } + + if let Some(st) = server.try_wait()? { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("server exited too early: {}", st), + )); + } + + // try again with slight delay + time::delay_for(Duration::from_millis(10)).await; + } + + Err(io::Error::new( + io::ErrorKind::TimedOut, + "timed out while connecting to server", + )) } }