… instead of acquiring the GIL in the Rust thread in the Drop impl
This commit is based on the premise that crossbeam-channel
with unbounded send and non-blocking receive is faster than
a contended GIL, but that remains to be measured.
… instead of acquiring the GIL in the Rust thread in the Drop impl
This commit is based on the premise that crossbeam-channel
with unbounded send and non-blocking receive is faster than
a contended GIL, but that remains to be measured.
| Automatic diff as part of commit; lint not applicable. |
| Automatic diff as part of commit; unit tests not applicable. |
| Path | Packages | |||
|---|---|---|---|---|
| M | rust/hg-cpython/src/copy_tracing.rs (47 lines) |
| use cpython::ObjectProtocol; | use cpython::ObjectProtocol; | ||||
| use cpython::PyBytes; | use cpython::PyBytes; | ||||
| use cpython::PyDict; | use cpython::PyDict; | ||||
| use cpython::PyDrop; | |||||
| use cpython::PyList; | use cpython::PyList; | ||||
| use cpython::PyModule; | use cpython::PyModule; | ||||
| use cpython::PyObject; | use cpython::PyObject; | ||||
| use cpython::PyResult; | use cpython::PyResult; | ||||
| use cpython::PyTuple; | use cpython::PyTuple; | ||||
| use cpython::Python; | use cpython::Python; | ||||
| use hg::copy_tracing::ChangedFiles; | use hg::copy_tracing::ChangedFiles; | ||||
| } | } | ||||
| } | } | ||||
| pub fn data(&self) -> &[u8] { | pub fn data(&self) -> &[u8] { | ||||
| // Safety: the raw pointer is valid as long as the PyBytes is still | // Safety: the raw pointer is valid as long as the PyBytes is still | ||||
| // alive, and the returned slice borrows `self`. | // alive, and the returned slice borrows `self`. | ||||
| unsafe { &*self.data } | unsafe { &*self.data } | ||||
| } | } | ||||
| pub fn unwrap(self) -> PyBytes { | |||||
| self.keep_alive | |||||
| } | |||||
| } | } | ||||
| } | } | ||||
| /// Combines copies information contained into revision `revs` to build a copy | /// Combines copies information contained into revision `revs` to build a copy | ||||
| /// map. | /// map. | ||||
| /// | /// | ||||
| /// See mercurial/copies.py for details | /// See mercurial/copies.py for details | ||||
| pub fn combine_changeset_copies_wrapper( | pub fn combine_changeset_copies_wrapper( | ||||
| let tuple: PyTuple = | let tuple: PyTuple = | ||||
| rev_info.call(py, (rev_py,), None)?.cast_into(py)?; | rev_info.call(py, (rev_py,), None)?.cast_into(py)?; | ||||
| let p1 = tuple.get_item(py, 0).extract(py)?; | let p1 = tuple.get_item(py, 0).extract(py)?; | ||||
| let p2 = tuple.get_item(py, 1).extract(py)?; | let p2 = tuple.get_item(py, 1).extract(py)?; | ||||
| let opt_bytes = tuple.get_item(py, 2).extract(py)?; | let opt_bytes = tuple.get_item(py, 2).extract(py)?; | ||||
| Ok((rev, p1, p2, opt_bytes)) | Ok((rev, p1, p2, opt_bytes)) | ||||
| }); | }); | ||||
| let path_copies = if !multi_thread { | let path_copies; | ||||
| if !multi_thread { | |||||
| let mut combine_changeset_copies = | let mut combine_changeset_copies = | ||||
| CombineChangesetCopies::new(children_count); | CombineChangesetCopies::new(children_count); | ||||
| for rev_info in revs_info { | for rev_info in revs_info { | ||||
| let (rev, p1, p2, opt_bytes) = rev_info?; | let (rev, p1, p2, opt_bytes) = rev_info?; | ||||
| let files = match &opt_bytes { | let files = match &opt_bytes { | ||||
| Some(bytes) => ChangedFiles::new(bytes.data(py)), | Some(bytes) => ChangedFiles::new(bytes.data(py)), | ||||
| // Python None was extracted to Option::None, | // Python None was extracted to Option::None, | ||||
| // meaning there was no copy data. | // meaning there was no copy data. | ||||
| None => ChangedFiles::new_empty(), | None => ChangedFiles::new_empty(), | ||||
| }; | }; | ||||
| combine_changeset_copies.add_revision(rev, p1, p2, files) | combine_changeset_copies.add_revision(rev, p1, p2, files) | ||||
| } | } | ||||
| combine_changeset_copies.finish(target_rev) | path_copies = combine_changeset_copies.finish(target_rev) | ||||
| } else { | } else { | ||||
| // Use a bounded channel to provide back-pressure: | // Use a bounded channel to provide back-pressure: | ||||
| // if the child thread is slower to process revisions than this thread | // if the child thread is slower to process revisions than this thread | ||||
| // is to gather data for them, an unbounded channel would keep | // is to gather data for them, an unbounded channel would keep | ||||
| // growing and eat memory. | // growing and eat memory. | ||||
| // | // | ||||
| // TODO: tweak the bound? | // TODO: tweak the bound? | ||||
| let (rev_info_sender, rev_info_receiver) = | let (rev_info_sender, rev_info_receiver) = | ||||
| crossbeam_channel::bounded::<RevInfo<PyBytesWithData>>(1000); | crossbeam_channel::bounded::<RevInfo<PyBytesWithData>>(1000); | ||||
| // This channel (going the other way around) however is unbounded. | |||||
| // If they were both bounded, there might potentially be deadlocks | |||||
| // where both channels are full and both threads are waiting on each | |||||
| // other. | |||||
| let (pybytes_sender, pybytes_receiver) = | |||||
| crossbeam_channel::unbounded(); | |||||
| // Start a thread that does CPU-heavy processing in parallel with the | // Start a thread that does CPU-heavy processing in parallel with the | ||||
| // loop below. | // loop below. | ||||
| // | // | ||||
| // If the parent thread panics, `rev_info_sender` will be dropped and | // If the parent thread panics, `rev_info_sender` will be dropped and | ||||
| // “disconnected”. `rev_info_receiver` will be notified of this and | // “disconnected”. `rev_info_receiver` will be notified of this and | ||||
| // exit its own loop. | // exit its own loop. | ||||
| let thread = std::thread::spawn(move || { | let thread = std::thread::spawn(move || { | ||||
| let mut combine_changeset_copies = | let mut combine_changeset_copies = | ||||
| CombineChangesetCopies::new(children_count); | CombineChangesetCopies::new(children_count); | ||||
| for (rev, p1, p2, opt_bytes) in rev_info_receiver { | for (rev, p1, p2, opt_bytes) in rev_info_receiver { | ||||
| let files = match &opt_bytes { | let files = match &opt_bytes { | ||||
| Some(raw) => ChangedFiles::new(raw.data()), | Some(raw) => ChangedFiles::new(raw.data()), | ||||
| // Python None was extracted to Option::None, | // Python None was extracted to Option::None, | ||||
| // meaning there was no copy data. | // meaning there was no copy data. | ||||
| None => ChangedFiles::new_empty(), | None => ChangedFiles::new_empty(), | ||||
| }; | }; | ||||
| combine_changeset_copies.add_revision(rev, p1, p2, files) | combine_changeset_copies.add_revision(rev, p1, p2, files); | ||||
| // The GIL is (still) implicitly acquired here through | // Send `PyBytes` back to the parent thread so the parent | ||||
| // `impl Drop for PyBytes`. | // thread can drop it. Otherwise the GIL would be implicitly | ||||
| // acquired here through `impl Drop for PyBytes`. | |||||
| if let Some(bytes) = opt_bytes { | |||||
| if let Err(_) = pybytes_sender.send(bytes.unwrap()) { | |||||
| // The channel is disconnected, meaning the parent | |||||
| // thread panicked or returned | |||||
| // early through | |||||
| // `?` to propagate a Python exception. | |||||
| break; | |||||
| } | |||||
| } | |||||
| } | } | ||||
| combine_changeset_copies.finish(target_rev) | combine_changeset_copies.finish(target_rev) | ||||
| }); | }); | ||||
| for rev_info in revs_info { | for rev_info in revs_info { | ||||
| let (rev, p1, p2, opt_bytes) = rev_info?; | let (rev, p1, p2, opt_bytes) = rev_info?; | ||||
| let opt_bytes = opt_bytes.map(|b| PyBytesWithData::new(py, b)); | let opt_bytes = opt_bytes.map(|b| PyBytesWithData::new(py, b)); | ||||
| // We’d prefer to avoid the child thread calling into Python code, | // We’d prefer to avoid the child thread calling into Python code, | ||||
| // but this avoids a potential deadlock on the GIL if it does: | // but this avoids a potential deadlock on the GIL if it does: | ||||
| py.allow_threads(|| { | py.allow_threads(|| { | ||||
| rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( | rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( | ||||
| "combine_changeset_copies: channel is disconnected", | "combine_changeset_copies: channel is disconnected", | ||||
| ); | ); | ||||
| }); | }); | ||||
| // Drop anything in the channel, without blocking | |||||
| for pybytes in pybytes_receiver.try_iter() { | |||||
| pybytes.release_ref(py) | |||||
| } | |||||
| } | } | ||||
| // We’d prefer to avoid the child thread calling into Python code, | // We’d prefer to avoid the child thread calling into Python code, | ||||
| // but this avoids a potential deadlock on the GIL if it does: | // but this avoids a potential deadlock on the GIL if it does: | ||||
| py.allow_threads(|| { | path_copies = py.allow_threads(|| { | ||||
| // Disconnect the channel to signal the child thread to stop: | // Disconnect the channel to signal the child thread to stop: | ||||
| // the `for … in rev_info_receiver` loop will end. | // the `for … in rev_info_receiver` loop will end. | ||||
| drop(rev_info_sender); | drop(rev_info_sender); | ||||
| // Wait for the child thread to stop, and propagate any panic. | // Wait for the child thread to stop, and propagate any panic. | ||||
| thread.join().unwrap_or_else(|panic_payload| { | thread.join().unwrap_or_else(|panic_payload| { | ||||
| std::panic::resume_unwind(panic_payload) | std::panic::resume_unwind(panic_payload) | ||||
| }) | }) | ||||
| }) | }); | ||||
| // Drop anything left in the channel | |||||
| for pybytes in pybytes_receiver.iter() { | |||||
| pybytes.release_ref(py) | |||||
| } | |||||
| }; | }; | ||||
| let out = PyDict::new(py); | let out = PyDict::new(py); | ||||
| for (dest, source) in path_copies.into_iter() { | for (dest, source) in path_copies.into_iter() { | ||||
| out.set_item( | out.set_item( | ||||
| py, | py, | ||||
| PyBytes::new(py, &dest.into_vec()), | PyBytes::new(py, &dest.into_vec()), | ||||
| PyBytes::new(py, &source.into_vec()), | PyBytes::new(py, &source.into_vec()), | ||||