This is the minimal reimplementation of gethgcmd(), execcmdserver(),
retryconnectcmdserver(), and connectcmdserver() in chg.c.
No config validation is implemented yet. And some Py3 workarounds would
be missing as this is the code I wrote in 2018.
Alphare |
hg-reviewers |
This is the minimal reimplementation of gethgcmd(), execcmdserver(),
retryconnectcmdserver(), and connectcmdserver() in chg.c.
No config validation is implemented yet. And some Py3 workarounds would
be missing as this is the code I wrote in 2018.
Automatic diff as part of commit; lint not applicable. |
Automatic diff as part of commit; unit tests not applicable. |
Aside from the outdated futures and Rust edition that Yuya already said are temporary, this looks good!
Path | Packages | |||
---|---|---|---|---|
M | rust/chg/src/lib.rs (3 lines) | |||
M | rust/chg/src/locator.rs (109 lines) | |||
M | rust/chg/src/main.rs (12 lines) |
// Copyright 2018 Yuya Nishihara <yuya@tcha.org> | // Copyright 2018 Yuya Nishihara <yuya@tcha.org> | ||||
// | // | ||||
// This software may be used and distributed according to the terms of the | // This software may be used and distributed according to the terms of the | ||||
// GNU General Public License version 2 or any later version. | // GNU General Public License version 2 or any later version. | ||||
extern crate bytes; | extern crate bytes; | ||||
#[macro_use] | #[macro_use] | ||||
extern crate futures; | extern crate futures; | ||||
extern crate libc; | extern crate libc; | ||||
#[macro_use] | |||||
extern crate log; | |||||
extern crate tokio; | extern crate tokio; | ||||
extern crate tokio_hglib; | extern crate tokio_hglib; | ||||
extern crate tokio_process; | extern crate tokio_process; | ||||
extern crate tokio_timer; | |||||
mod attachio; | mod attachio; | ||||
mod clientext; | mod clientext; | ||||
pub mod locator; | pub mod locator; | ||||
pub mod message; | pub mod message; | ||||
pub mod procutil; | pub mod procutil; | ||||
mod runcommand; | mod runcommand; | ||||
mod uihandler; | mod uihandler; | ||||
pub use clientext::ChgClientExt; | pub use clientext::ChgClientExt; | ||||
pub use uihandler::{ChgUiHandler, SystemHandler}; | pub use uihandler::{ChgUiHandler, SystemHandler}; |
// Copyright 2011, 2018 Yuya Nishihara <yuya@tcha.org> | // Copyright 2011, 2018 Yuya Nishihara <yuya@tcha.org> | ||||
// | // | ||||
// This software may be used and distributed according to the terms of the | // This software may be used and distributed according to the terms of the | ||||
// GNU General Public License version 2 or any later version. | // GNU General Public License version 2 or any later version. | ||||
//! Utility for locating command-server process. | //! Utility for locating command-server process. | ||||
use futures::future::{self, Either, Loop}; | |||||
use std::env; | use std::env; | ||||
use std::ffi::{OsStr, OsString}; | use std::ffi::{OsStr, OsString}; | ||||
use std::fs::{self, DirBuilder}; | use std::fs::{self, DirBuilder}; | ||||
use std::io; | use std::io; | ||||
use std::os::unix::ffi::{OsStrExt, OsStringExt}; | use std::os::unix::ffi::{OsStrExt, OsStringExt}; | ||||
use std::os::unix::fs::{DirBuilderExt, MetadataExt}; | use std::os::unix::fs::{DirBuilderExt, MetadataExt}; | ||||
use std::path::{Path, PathBuf}; | use std::path::{Path, PathBuf}; | ||||
use std::process; | use std::process::{self, Command}; | ||||
use std::time::Duration; | use std::time::Duration; | ||||
use tokio::prelude::*; | |||||
use tokio_hglib::UnixClient; | |||||
use tokio_process::{Child, CommandExt}; | |||||
use tokio_timer; | |||||
use super::procutil; | use super::procutil; | ||||
/// Helper to connect to and spawn a server process. | /// Helper to connect to and spawn a server process. | ||||
#[derive(Clone, Debug)] | #[derive(Clone, Debug)] | ||||
pub struct Locator { | pub struct Locator { | ||||
hg_command: OsString, | hg_command: OsString, | ||||
current_dir: PathBuf, | current_dir: PathBuf, | ||||
/// Temporary socket path for this client process. | /// Temporary socket path for this client process. | ||||
fn temp_sock_path(&self) -> PathBuf { | fn temp_sock_path(&self) -> PathBuf { | ||||
let src = self.base_sock_path.as_os_str().as_bytes(); | let src = self.base_sock_path.as_os_str().as_bytes(); | ||||
let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len() | let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len() | ||||
buf.extend_from_slice(src); | buf.extend_from_slice(src); | ||||
buf.extend_from_slice(format!(".{}", self.process_id).as_bytes()); | buf.extend_from_slice(format!(".{}", self.process_id).as_bytes()); | ||||
OsString::from_vec(buf).into() | OsString::from_vec(buf).into() | ||||
} | } | ||||
/// Connects to the server. | |||||
/// | |||||
/// The server process will be spawned if not running. | |||||
pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { | |||||
self.try_connect() | |||||
} | |||||
/// Tries to connect to the existing server, or spawns new if not running. | |||||
fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { | |||||
debug!("try connect to {}", self.base_sock_path.display()); | |||||
UnixClient::connect(self.base_sock_path.clone()).then(|res| match res { | |||||
Ok(client) => Either::A(future::ok((self, client))), | |||||
Err(_) => Either::B(self.spawn_connect()), | |||||
}) | |||||
} | |||||
/// Spawns new server process and connects to it. | |||||
/// | |||||
/// The server will be spawned at the current working directory, then | |||||
/// chdir to "/", so that the server will load configs from the target | |||||
/// repository. | |||||
fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { | |||||
let sock_path = self.temp_sock_path(); | |||||
debug!("start cmdserver at {}", sock_path.display()); | |||||
Command::new(&self.hg_command) | |||||
.arg("serve") | |||||
.arg("--cmdserver") | |||||
.arg("chgunix") | |||||
.arg("--address") | |||||
.arg(&sock_path) | |||||
.arg("--daemon-postexec") | |||||
.arg("chdir:/") | |||||
.current_dir(&self.current_dir) | |||||
.env_clear() | |||||
.envs(self.env_vars.iter().cloned()) | |||||
.env("CHGINTERNALMARK", "") | |||||
.spawn_async() | |||||
.into_future() | |||||
.and_then(|server| self.connect_spawned(server, sock_path)) | |||||
.and_then(|(loc, client, sock_path)| { | |||||
debug!( | |||||
"rename {} to {}", | |||||
sock_path.display(), | |||||
loc.base_sock_path.display() | |||||
); | |||||
fs::rename(&sock_path, &loc.base_sock_path)?; | |||||
Ok((loc, client)) | |||||
}) | |||||
} | |||||
/// Tries to connect to the just spawned server repeatedly until timeout | |||||
/// exceeded. | |||||
fn connect_spawned( | |||||
self, | |||||
server: Child, | |||||
sock_path: PathBuf, | |||||
) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> { | |||||
debug!("try connect to {} repeatedly", sock_path.display()); | |||||
let connect = future::loop_fn(sock_path, |sock_path| { | |||||
UnixClient::connect(sock_path.clone()).then(|res| { | |||||
match res { | |||||
Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))), | |||||
Err(_) => { | |||||
// try again with slight delay | |||||
let fut = tokio_timer::sleep(Duration::from_millis(10)) | |||||
.map(|()| Loop::Continue(sock_path)) | |||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err)); | |||||
Either::B(fut) | |||||
} | |||||
} | |||||
}) | |||||
}); | |||||
// waits for either connection established or server failed to start | |||||
connect | |||||
.select2(server) | |||||
.map_err(|res| res.split().0) | |||||
.timeout(self.timeout) | |||||
.map_err(|err| { | |||||
err.into_inner().unwrap_or_else(|| { | |||||
io::Error::new( | |||||
io::ErrorKind::TimedOut, | |||||
"timed out while connecting to server", | |||||
) | |||||
}) | |||||
}) | |||||
.and_then(|res| { | |||||
match res { | |||||
Either::A(((client, sock_path), server)) => { | |||||
server.forget(); // continue to run in background | |||||
Ok((self, client, sock_path)) | |||||
} | |||||
Either::B((st, _)) => Err(io::Error::new( | |||||
io::ErrorKind::Other, | |||||
format!("server exited too early: {}", st), | |||||
)), | |||||
} | |||||
}) | |||||
} | |||||
} | } | ||||
/// Determines the server socket to connect to. | /// Determines the server socket to connect to. | ||||
/// | /// | ||||
/// If no `$CHGSOCKNAME` is specified, the socket directory will be created | /// If no `$CHGSOCKNAME` is specified, the socket directory will be created | ||||
/// as necessary. | /// as necessary. | ||||
pub fn prepare_server_socket_path() -> io::Result<PathBuf> { | fn prepare_server_socket_path() -> io::Result<PathBuf> { | ||||
if let Some(s) = env::var_os("CHGSOCKNAME") { | if let Some(s) = env::var_os("CHGSOCKNAME") { | ||||
Ok(PathBuf::from(s)) | Ok(PathBuf::from(s)) | ||||
} else { | } else { | ||||
let mut path = default_server_socket_dir(); | let mut path = default_server_socket_dir(); | ||||
create_secure_dir(&path)?; | create_secure_dir(&path)?; | ||||
path.push("server"); | path.push("server"); | ||||
Ok(path) | Ok(path) | ||||
} | } |
// Copyright 2018 Yuya Nishihara <yuya@tcha.org> | // Copyright 2018 Yuya Nishihara <yuya@tcha.org> | ||||
// | // | ||||
// This software may be used and distributed according to the terms of the | // This software may be used and distributed according to the terms of the | ||||
// GNU General Public License version 2 or any later version. | // GNU General Public License version 2 or any later version. | ||||
extern crate chg; | extern crate chg; | ||||
extern crate futures; | extern crate futures; | ||||
extern crate log; | extern crate log; | ||||
extern crate tokio; | extern crate tokio; | ||||
extern crate tokio_hglib; | extern crate tokio_hglib; | ||||
use chg::locator; | use chg::locator::Locator; | ||||
use chg::procutil; | use chg::procutil; | ||||
use chg::{ChgClientExt, ChgUiHandler}; | use chg::{ChgClientExt, ChgUiHandler}; | ||||
use futures::sync::oneshot; | use futures::sync::oneshot; | ||||
use std::env; | use std::env; | ||||
use std::io; | use std::io; | ||||
use std::process; | use std::process; | ||||
use std::time::Instant; | use std::time::Instant; | ||||
use tokio::prelude::*; | use tokio::prelude::*; | ||||
use tokio_hglib::UnixClient; | |||||
struct DebugLogger { | struct DebugLogger { | ||||
start: Instant, | start: Instant, | ||||
} | } | ||||
impl DebugLogger { | impl DebugLogger { | ||||
pub fn new() -> DebugLogger { | pub fn new() -> DebugLogger { | ||||
DebugLogger { | DebugLogger { | ||||
fn main() { | fn main() { | ||||
if env::var_os("CHGDEBUG").is_some() { | if env::var_os("CHGDEBUG").is_some() { | ||||
log::set_boxed_logger(Box::new(DebugLogger::new())) | log::set_boxed_logger(Box::new(DebugLogger::new())) | ||||
.expect("any logger should not be installed yet"); | .expect("any logger should not be installed yet"); | ||||
log::set_max_level(log::LevelFilter::Debug); | log::set_max_level(log::LevelFilter::Debug); | ||||
} | } | ||||
// TODO: add loop detection by $CHGINTERNALMARK | |||||
let code = run().unwrap_or_else(|err| { | let code = run().unwrap_or_else(|err| { | ||||
writeln!(io::stderr(), "chg: abort: {}", err).unwrap_or(()); | writeln!(io::stderr(), "chg: abort: {}", err).unwrap_or(()); | ||||
255 | 255 | ||||
}); | }); | ||||
process::exit(code); | process::exit(code); | ||||
} | } | ||||
fn run() -> io::Result<i32> { | fn run() -> io::Result<i32> { | ||||
let current_dir = env::current_dir()?; | let current_dir = env::current_dir()?; | ||||
let sock_path = locator::prepare_server_socket_path()?; | let loc = Locator::prepare_from_env()?; | ||||
let handler = ChgUiHandler::new(); | let handler = ChgUiHandler::new(); | ||||
let (result_tx, result_rx) = oneshot::channel(); | let (result_tx, result_rx) = oneshot::channel(); | ||||
let fut = UnixClient::connect(sock_path) | let fut = loc | ||||
.and_then(|client| client.set_current_dir(current_dir)) | .connect() | ||||
.and_then(|(_, client)| client.set_current_dir(current_dir)) | |||||
.and_then(|client| client.attach_io(io::stdin(), io::stdout(), io::stderr())) | .and_then(|client| client.attach_io(io::stdin(), io::stdout(), io::stderr())) | ||||
.and_then(|client| { | .and_then(|client| { | ||||
let pid = client.server_spec().process_id.unwrap(); | let pid = client.server_spec().process_id.unwrap(); | ||||
let pgid = client.server_spec().process_group_id; | let pgid = client.server_spec().process_group_id; | ||||
procutil::setup_signal_handler_once(pid, pgid)?; | procutil::setup_signal_handler_once(pid, pgid)?; | ||||
Ok(client) | Ok(client) | ||||
}) | }) | ||||
.and_then(|client| client.run_command_chg(handler, env::args_os().skip(1))) | .and_then(|client| client.run_command_chg(handler, env::args_os().skip(1))) |