diff --git a/rust/chg/src/lib.rs b/rust/chg/src/lib.rs --- a/rust/chg/src/lib.rs +++ b/rust/chg/src/lib.rs @@ -8,7 +8,7 @@ //pub mod locator; pub mod message; pub mod procutil; -//mod runcommand; +mod runcommand; mod uihandler; //pub use clientext::ChgClientExt; diff --git a/rust/chg/src/runcommand.rs b/rust/chg/src/runcommand.rs --- a/rust/chg/src/runcommand.rs +++ b/rust/chg/src/runcommand.rs @@ -6,164 +6,56 @@ //! Functions to run Mercurial command in cHg-aware command server. use bytes::Bytes; -use futures::future::IntoFuture; -use futures::{Async, Future, Poll}; use std::io; -use std::mem; use std::os::unix::io::AsRawFd; use tokio_hglib::codec::ChannelMessage; -use tokio_hglib::protocol::MessageLoop; -use tokio_hglib::{Client, Connection}; +use tokio_hglib::{Connection, Protocol}; -use crate::attachio::AttachIo; +use crate::attachio; use crate::message::{self, CommandType}; use crate::uihandler::SystemHandler; -enum AsyncS { - Ready(R), - NotReady(S), - PollAgain(S), -} - -enum CommandState -where - C: Connection, - H: SystemHandler, -{ - Running(MessageLoop, H), - SpawningPager(Client, ::Future), - AttachingPager(AttachIo, H), - WaitingSystem(Client, ::Future), - Finished, -} - -type CommandPoll = io::Result, H, i32), CommandState>>; - -/// Future resolves to `(exit_code, client)`. -#[must_use = "futures do nothing unless polled"] -pub struct ChgRunCommand -where - C: Connection, - H: SystemHandler, -{ - state: CommandState, -} - -impl ChgRunCommand -where - C: Connection + AsRawFd, - H: SystemHandler, -{ - pub fn with_client(client: Client, handler: H, packed_args: Bytes) -> ChgRunCommand { - let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args); - ChgRunCommand { - state: CommandState::Running(msg_loop, handler), - } - } -} - -impl Future for ChgRunCommand -where - C: Connection + AsRawFd, - H: SystemHandler, -{ - type Item = (Client, H, i32); - type Error = io::Error; - - fn poll(&mut self) -> Poll { - loop { - let state = mem::replace(&mut self.state, CommandState::Finished); - match state.poll()? { - AsyncS::Ready((client, handler, code)) => { - return Ok(Async::Ready((client, handler, code))); - } - AsyncS::NotReady(newstate) => { - self.state = newstate; - return Ok(Async::NotReady); - } - AsyncS::PollAgain(newstate) => { - self.state = newstate; - } - } - } - } -} - -impl CommandState -where - C: Connection + AsRawFd, - H: SystemHandler, -{ - fn poll(self) -> CommandPoll { - match self { - CommandState::Running(mut msg_loop, handler) => { - if let Async::Ready((client, msg)) = msg_loop.poll()? { - process_message(client, handler, msg) - } else { - Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler))) - } - } - CommandState::SpawningPager(client, mut fut) => { - if let Async::Ready((handler, pin)) = fut.poll()? { - let fut = AttachIo::with_client(client, io::stdin(), pin, None); - Ok(AsyncS::PollAgain(CommandState::AttachingPager( - fut, handler, - ))) - } else { - Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut))) - } - } - CommandState::AttachingPager(mut fut, handler) => { - if let Async::Ready(client) = fut.poll()? { - let msg_loop = MessageLoop::start(client, b""); // terminator - Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) - } else { - Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler))) - } - } - CommandState::WaitingSystem(client, mut fut) => { - if let Async::Ready((handler, code)) = fut.poll()? { - let data = message::pack_result_code(code); - let msg_loop = MessageLoop::resume_with_data(client, data); - Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) - } else { - Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut))) - } - } - CommandState::Finished => panic!("poll ChgRunCommand after it's done"), - } - } -} - -fn process_message(client: Client, handler: H, msg: ChannelMessage) -> CommandPoll -where - C: Connection, - H: SystemHandler, -{ - { - match msg { +/// Runs the given Mercurial command in cHg-aware command server, and +/// fetches the result code. +/// +/// This is a subset of tokio-hglib's `run_command()` with the additional +/// SystemRequest support. +pub async fn run_command( + proto: &mut Protocol, + handler: &mut impl SystemHandler, + packed_args: impl Into, +) -> io::Result { + proto + .send_command_with_args("runcommand", packed_args) + .await?; + loop { + match proto.fetch_response().await? { ChannelMessage::Data(b'r', data) => { - let code = message::parse_result_code(data)?; - Ok(AsyncS::Ready((client, handler, code))) + return message::parse_result_code(data); } ChannelMessage::Data(..) => { // just ignores data sent to optional channel - let msg_loop = MessageLoop::resume(client); - Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) } - ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err( - io::Error::new(io::ErrorKind::InvalidData, "unsupported request"), - ), + ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "unsupported request", + )); + } ChannelMessage::SystemRequest(data) => { let (cmd_type, cmd_spec) = message::parse_command_spec(data)?; match cmd_type { CommandType::Pager => { - let fut = handler.spawn_pager(cmd_spec).into_future(); - Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut))) + // server spins new command loop while pager request is + // in progress, which can be terminated by "" command. + let pin = handler.spawn_pager(&cmd_spec).await?; + attachio::attach_io(proto, &io::stdin(), &pin, &pin).await?; + proto.send_command("").await?; // terminator } CommandType::System => { - let fut = handler.run_system(cmd_spec).into_future(); - Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut))) + let code = handler.run_system(&cmd_spec).await?; + let data = message::pack_result_code(code); + proto.send_data(data).await?; } } }