The crafted state machine is no longer needed thanks to async/await.
The state machine is basically rewritten as follows:
- Ready(..) -> return ..
- PollAgain(..) -> run .. and await
- Err(..) -> return Err(..)
| Alphare |
| hg-reviewers |
The crafted state machine is no longer needed thanks to async/await.
The state machine is basically rewritten as follows:
| Automatic diff as part of commit; lint not applicable. |
| Automatic diff as part of commit; unit tests not applicable. |
| // Copyright 2018 Yuya Nishihara <yuya@tcha.org> | // Copyright 2018 Yuya Nishihara <yuya@tcha.org> | ||||
| // | // | ||||
| // This software may be used and distributed according to the terms of the | // This software may be used and distributed according to the terms of the | ||||
| // GNU General Public License version 2 or any later version. | // GNU General Public License version 2 or any later version. | ||||
| mod attachio; | mod attachio; | ||||
| //mod clientext; | //mod clientext; | ||||
| //pub mod locator; | //pub mod locator; | ||||
| pub mod message; | pub mod message; | ||||
| pub mod procutil; | pub mod procutil; | ||||
| //mod runcommand; | mod runcommand; | ||||
| mod uihandler; | mod uihandler; | ||||
| //pub use clientext::ChgClientExt; | //pub use clientext::ChgClientExt; | ||||
| pub use uihandler::{ChgUiHandler, SystemHandler}; | pub use uihandler::{ChgUiHandler, SystemHandler}; | ||||
| // Copyright 2018 Yuya Nishihara <yuya@tcha.org> | // Copyright 2018 Yuya Nishihara <yuya@tcha.org> | ||||
| // | // | ||||
| // This software may be used and distributed according to the terms of the | // This software may be used and distributed according to the terms of the | ||||
| // GNU General Public License version 2 or any later version. | // GNU General Public License version 2 or any later version. | ||||
| //! Functions to run Mercurial command in cHg-aware command server. | //! Functions to run Mercurial command in cHg-aware command server. | ||||
| use bytes::Bytes; | use bytes::Bytes; | ||||
| use futures::future::IntoFuture; | |||||
| use futures::{Async, Future, Poll}; | |||||
| use std::io; | use std::io; | ||||
| use std::mem; | |||||
| use std::os::unix::io::AsRawFd; | use std::os::unix::io::AsRawFd; | ||||
| use tokio_hglib::codec::ChannelMessage; | use tokio_hglib::codec::ChannelMessage; | ||||
| use tokio_hglib::protocol::MessageLoop; | use tokio_hglib::{Connection, Protocol}; | ||||
| use tokio_hglib::{Client, Connection}; | |||||
| use crate::attachio::AttachIo; | use crate::attachio; | ||||
| use crate::message::{self, CommandType}; | use crate::message::{self, CommandType}; | ||||
| use crate::uihandler::SystemHandler; | use crate::uihandler::SystemHandler; | ||||
| enum AsyncS<R, S> { | /// Runs the given Mercurial command in cHg-aware command server, and | ||||
| Ready(R), | /// fetches the result code. | ||||
| NotReady(S), | /// | ||||
| PollAgain(S), | /// This is a subset of tokio-hglib's `run_command()` with the additional | ||||
| } | /// SystemRequest support. | ||||
| pub async fn run_command( | |||||
| enum CommandState<C, H> | proto: &mut Protocol<impl Connection + AsRawFd>, | ||||
| where | handler: &mut impl SystemHandler, | ||||
| C: Connection, | packed_args: impl Into<Bytes>, | ||||
| H: SystemHandler, | ) -> io::Result<i32> { | ||||
| { | proto | ||||
| Running(MessageLoop<C>, H), | .send_command_with_args("runcommand", packed_args) | ||||
| SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future), | .await?; | ||||
| AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H), | |||||
| WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future), | |||||
| Finished, | |||||
| } | |||||
| type CommandPoll<C, H> = io::Result<AsyncS<(Client<C>, H, i32), CommandState<C, H>>>; | |||||
| /// Future resolves to `(exit_code, client)`. | |||||
| #[must_use = "futures do nothing unless polled"] | |||||
| pub struct ChgRunCommand<C, H> | |||||
| where | |||||
| C: Connection, | |||||
| H: SystemHandler, | |||||
| { | |||||
| state: CommandState<C, H>, | |||||
| } | |||||
| impl<C, H> ChgRunCommand<C, H> | |||||
| where | |||||
| C: Connection + AsRawFd, | |||||
| H: SystemHandler, | |||||
| { | |||||
| pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> { | |||||
| let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args); | |||||
| ChgRunCommand { | |||||
| state: CommandState::Running(msg_loop, handler), | |||||
| } | |||||
| } | |||||
| } | |||||
| impl<C, H> Future for ChgRunCommand<C, H> | |||||
| where | |||||
| C: Connection + AsRawFd, | |||||
| H: SystemHandler, | |||||
| { | |||||
| type Item = (Client<C>, H, i32); | |||||
| type Error = io::Error; | |||||
| fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | |||||
| loop { | loop { | ||||
| let state = mem::replace(&mut self.state, CommandState::Finished); | match proto.fetch_response().await? { | ||||
| match state.poll()? { | |||||
| AsyncS::Ready((client, handler, code)) => { | |||||
| return Ok(Async::Ready((client, handler, code))); | |||||
| } | |||||
| AsyncS::NotReady(newstate) => { | |||||
| self.state = newstate; | |||||
| return Ok(Async::NotReady); | |||||
| } | |||||
| AsyncS::PollAgain(newstate) => { | |||||
| self.state = newstate; | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| impl<C, H> CommandState<C, H> | |||||
| where | |||||
| C: Connection + AsRawFd, | |||||
| H: SystemHandler, | |||||
| { | |||||
| fn poll(self) -> CommandPoll<C, H> { | |||||
| match self { | |||||
| CommandState::Running(mut msg_loop, handler) => { | |||||
| if let Async::Ready((client, msg)) = msg_loop.poll()? { | |||||
| process_message(client, handler, msg) | |||||
| } else { | |||||
| Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler))) | |||||
| } | |||||
| } | |||||
| CommandState::SpawningPager(client, mut fut) => { | |||||
| if let Async::Ready((handler, pin)) = fut.poll()? { | |||||
| let fut = AttachIo::with_client(client, io::stdin(), pin, None); | |||||
| Ok(AsyncS::PollAgain(CommandState::AttachingPager( | |||||
| fut, handler, | |||||
| ))) | |||||
| } else { | |||||
| Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut))) | |||||
| } | |||||
| } | |||||
| CommandState::AttachingPager(mut fut, handler) => { | |||||
| if let Async::Ready(client) = fut.poll()? { | |||||
| let msg_loop = MessageLoop::start(client, b""); // terminator | |||||
| Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) | |||||
| } else { | |||||
| Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler))) | |||||
| } | |||||
| } | |||||
| CommandState::WaitingSystem(client, mut fut) => { | |||||
| if let Async::Ready((handler, code)) = fut.poll()? { | |||||
| let data = message::pack_result_code(code); | |||||
| let msg_loop = MessageLoop::resume_with_data(client, data); | |||||
| Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) | |||||
| } else { | |||||
| Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut))) | |||||
| } | |||||
| } | |||||
| CommandState::Finished => panic!("poll ChgRunCommand after it's done"), | |||||
| } | |||||
| } | |||||
| } | |||||
| fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H> | |||||
| where | |||||
| C: Connection, | |||||
| H: SystemHandler, | |||||
| { | |||||
| { | |||||
| match msg { | |||||
| ChannelMessage::Data(b'r', data) => { | ChannelMessage::Data(b'r', data) => { | ||||
| let code = message::parse_result_code(data)?; | return message::parse_result_code(data); | ||||
| Ok(AsyncS::Ready((client, handler, code))) | |||||
| } | } | ||||
| ChannelMessage::Data(..) => { | ChannelMessage::Data(..) => { | ||||
| // just ignores data sent to optional channel | // just ignores data sent to optional channel | ||||
| let msg_loop = MessageLoop::resume(client); | |||||
| Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) | |||||
| } | } | ||||
| ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err( | ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => { | ||||
| io::Error::new(io::ErrorKind::InvalidData, "unsupported request"), | return Err(io::Error::new( | ||||
| ), | io::ErrorKind::InvalidData, | ||||
| "unsupported request", | |||||
| )); | |||||
| } | |||||
| ChannelMessage::SystemRequest(data) => { | ChannelMessage::SystemRequest(data) => { | ||||
| let (cmd_type, cmd_spec) = message::parse_command_spec(data)?; | let (cmd_type, cmd_spec) = message::parse_command_spec(data)?; | ||||
| match cmd_type { | match cmd_type { | ||||
| CommandType::Pager => { | CommandType::Pager => { | ||||
| let fut = handler.spawn_pager(cmd_spec).into_future(); | // server spins new command loop while pager request is | ||||
| Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut))) | // in progress, which can be terminated by "" command. | ||||
| let pin = handler.spawn_pager(&cmd_spec).await?; | |||||
| attachio::attach_io(proto, &io::stdin(), &pin, &pin).await?; | |||||
| proto.send_command("").await?; // terminator | |||||
| } | } | ||||
| CommandType::System => { | CommandType::System => { | ||||
| let fut = handler.run_system(cmd_spec).into_future(); | let code = handler.run_system(&cmd_spec).await?; | ||||
| Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut))) | let data = message::pack_result_code(code); | ||||
| proto.send_data(data).await?; | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| } | } | ||||