Details
Details
- Reviewers
Alphare - Group Reviewers
hg-reviewers - Commits
- rHG27fe8cc1338f: rust-chg: clean up excessive indents
Diff Detail
Diff Detail
- Repository
- rHG Mercurial
- Branch
- default
- Lint
No Linters Available - Unit
No Unit Test Coverage
( )
Alphare |
hg-reviewers |
No Linters Available |
No Unit Test Coverage |
Path | Packages | |||
---|---|---|---|---|
M | rust/chg/src/attachio.rs (63 lines) | |||
M | rust/chg/src/locator.rs (99 lines) | |||
M | rust/chg/src/uihandler.rs (15 lines) |
Commit | Parents | Author | Summary | Date |
---|---|---|---|---|
a5ecfbebe5f6 | fff41b9bd000 | Yuya Nishihara | Apr 11 2020, 4:43 AM |
/// | /// | ||||
/// The client-side fds may be dropped once duplicated to the server. | /// The client-side fds may be dropped once duplicated to the server. | ||||
pub async fn attach_io( | pub async fn attach_io( | ||||
proto: &mut Protocol<impl Connection + AsRawFd>, | proto: &mut Protocol<impl Connection + AsRawFd>, | ||||
stdin: &impl AsRawFd, | stdin: &impl AsRawFd, | ||||
stdout: &impl AsRawFd, | stdout: &impl AsRawFd, | ||||
stderr: &impl AsRawFd, | stderr: &impl AsRawFd, | ||||
) -> io::Result<()> { | ) -> io::Result<()> { | ||||
// TODO: unindent | |||||
{ | |||||
proto.send_command("attachio").await?; | proto.send_command("attachio").await?; | ||||
loop { | loop { | ||||
match proto.fetch_response().await? { | match proto.fetch_response().await? { | ||||
ChannelMessage::Data(b'r', data) => { | ChannelMessage::Data(b'r', data) => { | ||||
let fd_cnt = message::parse_result_code(data)?; | let fd_cnt = message::parse_result_code(data)?; | ||||
if fd_cnt == 3 { | if fd_cnt == 3 { | ||||
return Ok(()); | return Ok(()); | ||||
} else { | } else { | ||||
return Err(io::Error::new( | return Err(io::Error::new( | ||||
io::ErrorKind::InvalidData, | io::ErrorKind::InvalidData, | ||||
"unexpected attachio result", | "unexpected attachio result", | ||||
)); | )); | ||||
} | } | ||||
} | } | ||||
ChannelMessage::Data(..) => { | ChannelMessage::Data(..) => { | ||||
// just ignore data sent to uninteresting (optional) channel | // just ignore data sent to uninteresting (optional) channel | ||||
} | } | ||||
ChannelMessage::InputRequest(1) => { | ChannelMessage::InputRequest(1) => { | ||||
// this may fail with EWOULDBLOCK in theory, but the | // this may fail with EWOULDBLOCK in theory, but the | ||||
// payload is quite small, and the send buffer should | // payload is quite small, and the send buffer should | ||||
// be empty so the operation will complete immediately | // be empty so the operation will complete immediately | ||||
let sock_fd = proto.as_raw_fd(); | let sock_fd = proto.as_raw_fd(); | ||||
let ifd = stdin.as_raw_fd(); | let ifd = stdin.as_raw_fd(); | ||||
let ofd = stdout.as_raw_fd(); | let ofd = stdout.as_raw_fd(); | ||||
let efd = stderr.as_raw_fd(); | let efd = stderr.as_raw_fd(); | ||||
procutil::send_raw_fds(sock_fd, &[ifd, ofd, efd])?; | procutil::send_raw_fds(sock_fd, &[ifd, ofd, efd])?; | ||||
} | } | ||||
ChannelMessage::InputRequest(..) | ChannelMessage::InputRequest(..) | ||||
| ChannelMessage::LineRequest(..) | | ChannelMessage::LineRequest(..) | ||||
| ChannelMessage::SystemRequest(..) => { | | ChannelMessage::SystemRequest(..) => { | ||||
return Err(io::Error::new( | return Err(io::Error::new( | ||||
io::ErrorKind::InvalidData, | io::ErrorKind::InvalidData, | ||||
"unsupported request while attaching io", | "unsupported request while attaching io", | ||||
)); | )); | ||||
} | } | ||||
} | } | ||||
} | } | ||||
} | } | ||||
} |
let mut client = self.try_connect().await?; | let mut client = self.try_connect().await?; | ||||
let instructions = client.validate(&self.hg_early_args).await?; | let instructions = client.validate(&self.hg_early_args).await?; | ||||
let reconnect = self.run_instructions(&instructions)?; | let reconnect = self.run_instructions(&instructions)?; | ||||
if !reconnect { | if !reconnect { | ||||
return Ok(client); | return Ok(client); | ||||
} | } | ||||
} | } | ||||
// TODO: unindent | |||||
{ | |||||
{ | |||||
let msg = format!( | let msg = format!( | ||||
concat!( | concat!( | ||||
"too many redirections.\n", | "too many redirections.\n", | ||||
"Please make sure {:?} is not a wrapper which ", | "Please make sure {:?} is not a wrapper which ", | ||||
"changes sensitive environment variables ", | "changes sensitive environment variables ", | ||||
"before executing hg. If you have to use a ", | "before executing hg. If you have to use a ", | ||||
"wrapper, wrap chg instead of hg.", | "wrapper, wrap chg instead of hg.", | ||||
), | ), | ||||
self.hg_command | self.hg_command | ||||
); | ); | ||||
Err(io::Error::new(io::ErrorKind::Other, msg)) | Err(io::Error::new(io::ErrorKind::Other, msg)) | ||||
} | } | ||||
} | |||||
} | |||||
/// Runs instructions received from the server. | /// Runs instructions received from the server. | ||||
/// | /// | ||||
/// Returns true if the client should try connecting to the other server. | /// Returns true if the client should try connecting to the other server. | ||||
fn run_instructions(&mut self, instructions: &[Instruction]) -> io::Result<bool> { | fn run_instructions(&mut self, instructions: &[Instruction]) -> io::Result<bool> { | ||||
let mut reconnect = false; | let mut reconnect = false; | ||||
for inst in instructions { | for inst in instructions { | ||||
debug!("instruction: {:?}", inst); | debug!("instruction: {:?}", inst); | ||||
/// Tries to connect to the existing server, or spawns new if not running. | /// Tries to connect to the existing server, or spawns new if not running. | ||||
async fn try_connect(&mut self) -> io::Result<ChgClient> { | async fn try_connect(&mut self) -> io::Result<ChgClient> { | ||||
let sock_path = self | let sock_path = self | ||||
.redirect_sock_path | .redirect_sock_path | ||||
.as_ref() | .as_ref() | ||||
.unwrap_or(&self.base_sock_path) | .unwrap_or(&self.base_sock_path) | ||||
.clone(); | .clone(); | ||||
debug!("try connect to {}", sock_path.display()); | debug!("try connect to {}", sock_path.display()); | ||||
// TODO: unindent | |||||
{ | |||||
{ | |||||
let mut client = match ChgClient::connect(sock_path).await { | let mut client = match ChgClient::connect(sock_path).await { | ||||
Ok(client) => client, | Ok(client) => client, | ||||
Err(_) => { | Err(_) => { | ||||
// Prevent us from being re-connected to the outdated | // Prevent us from being re-connected to the outdated | ||||
// master server: We were told by the server to redirect | // master server: We were told by the server to redirect | ||||
// to redirect_sock_path, which didn't work. We do not | // to redirect_sock_path, which didn't work. We do not | ||||
// want to connect to the same master server again | // want to connect to the same master server again | ||||
// because it would probably tell us the same thing. | // because it would probably tell us the same thing. | ||||
if self.redirect_sock_path.is_some() { | if self.redirect_sock_path.is_some() { | ||||
fs::remove_file(&self.base_sock_path).unwrap_or(()); | fs::remove_file(&self.base_sock_path).unwrap_or(()); | ||||
// may race | // may race | ||||
} | } | ||||
self.spawn_connect().await? | self.spawn_connect().await? | ||||
} | } | ||||
}; | }; | ||||
check_server_capabilities(client.server_spec())?; | check_server_capabilities(client.server_spec())?; | ||||
// It's purely optional, and the server might not support this command. | // It's purely optional, and the server might not support this command. | ||||
if client.server_spec().capabilities.contains("setprocname") { | if client.server_spec().capabilities.contains("setprocname") { | ||||
client | client | ||||
.set_process_name(format!("chg[worker/{}]", self.process_id)) | .set_process_name(format!("chg[worker/{}]", self.process_id)) | ||||
.await?; | .await?; | ||||
} | } | ||||
client.set_current_dir(&self.current_dir).await?; | client.set_current_dir(&self.current_dir).await?; | ||||
client | client | ||||
.set_env_vars_os(self.env_vars.iter().cloned()) | .set_env_vars_os(self.env_vars.iter().cloned()) | ||||
.await?; | .await?; | ||||
Ok(client) | Ok(client) | ||||
} | } | ||||
} | |||||
} | |||||
/// Spawns new server process and connects to it. | /// Spawns new server process and connects to it. | ||||
/// | /// | ||||
/// The server will be spawned at the current working directory, then | /// The server will be spawned at the current working directory, then | ||||
/// chdir to "/", so that the server will load configs from the target | /// chdir to "/", so that the server will load configs from the target | ||||
/// repository. | /// repository. | ||||
async fn spawn_connect(&mut self) -> io::Result<ChgClient> { | async fn spawn_connect(&mut self) -> io::Result<ChgClient> { | ||||
let sock_path = self.temp_sock_path(); | let sock_path = self.temp_sock_path(); | ||||
debug!("start cmdserver at {}", sock_path.display()); | debug!("start cmdserver at {}", sock_path.display()); | ||||
let server = Command::new(&self.hg_command) | let server = Command::new(&self.hg_command) | ||||
.arg("serve") | .arg("serve") | ||||
.arg("--cmdserver") | .arg("--cmdserver") | ||||
.arg("chgunix") | .arg("chgunix") | ||||
.arg("--address") | .arg("--address") | ||||
.arg(&sock_path) | .arg(&sock_path) | ||||
.arg("--daemon-postexec") | .arg("--daemon-postexec") | ||||
.arg("chdir:/") | .arg("chdir:/") | ||||
.args(&self.hg_early_args) | .args(&self.hg_early_args) | ||||
.current_dir(&self.current_dir) | .current_dir(&self.current_dir) | ||||
.env_clear() | .env_clear() | ||||
.envs(self.env_vars.iter().cloned()) | .envs(self.env_vars.iter().cloned()) | ||||
.env("CHGINTERNALMARK", "") | .env("CHGINTERNALMARK", "") | ||||
.spawn()?; | .spawn()?; | ||||
let client = self.connect_spawned(server, &sock_path).await?; | let client = self.connect_spawned(server, &sock_path).await?; | ||||
// TODO: unindent | |||||
{ | |||||
{ | |||||
debug!( | debug!( | ||||
"rename {} to {}", | "rename {} to {}", | ||||
sock_path.display(), | sock_path.display(), | ||||
self.base_sock_path.display() | self.base_sock_path.display() | ||||
); | ); | ||||
fs::rename(&sock_path, &self.base_sock_path)?; | fs::rename(&sock_path, &self.base_sock_path)?; | ||||
Ok(client) | Ok(client) | ||||
} | } | ||||
} | |||||
} | |||||
/// Tries to connect to the just spawned server repeatedly until timeout | /// Tries to connect to the just spawned server repeatedly until timeout | ||||
/// exceeded. | /// exceeded. | ||||
async fn connect_spawned( | async fn connect_spawned( | ||||
&mut self, | &mut self, | ||||
mut server: Child, | mut server: Child, | ||||
sock_path: &Path, | sock_path: &Path, | ||||
) -> io::Result<ChgClient> { | ) -> io::Result<ChgClient> { |
// anything. (issue5278) | // anything. (issue5278) | ||||
// kill(peerpid, SIGPIPE); | // kill(peerpid, SIGPIPE); | ||||
self.pager = Some(pager); | self.pager = Some(pager); | ||||
Ok(pin) | Ok(pin) | ||||
} | } | ||||
async fn run_system(&mut self, spec: &CommandSpec) -> io::Result<i32> { | async fn run_system(&mut self, spec: &CommandSpec) -> io::Result<i32> { | ||||
let status = new_shell_command(&spec).spawn()?.await?; | let status = new_shell_command(&spec).spawn()?.await?; | ||||
// TODO: unindent | |||||
{ | |||||
{ | |||||
let code = status | let code = status | ||||
.code() | .code() | ||||
.or_else(|| status.signal().map(|n| -n)) | .or_else(|| status.signal().map(|n| -n)) | ||||
.expect("either exit code or signal should be set"); | .expect("either exit code or signal should be set"); | ||||
Ok(code) | Ok(code) | ||||
} | } | ||||
} | } | ||||
} | |||||
} | |||||
fn new_shell_command(spec: &CommandSpec) -> Command { | fn new_shell_command(spec: &CommandSpec) -> Command { | ||||
let mut builder = Command::new("/bin/sh"); | let mut builder = Command::new("/bin/sh"); | ||||
builder | builder | ||||
.arg("-c") | .arg("-c") | ||||
.arg(&spec.command) | .arg(&spec.command) | ||||
.current_dir(&spec.current_dir) | .current_dir(&spec.current_dir) | ||||
.env_clear() | .env_clear() | ||||
.envs(spec.envs.iter().cloned()); | .envs(spec.envs.iter().cloned()); | ||||
builder | builder | ||||
} | } |