… instead of acquiring the GIL in the Rust thread in the Drop impl
This commit is based on the premise that crossbeam-channel
with unbounded send and non-blocking receive is faster than
a contended GIL, but that remains to be measured.
… instead of acquiring the GIL in the Rust thread in the Drop impl
This commit is based on the premise that crossbeam-channel
with unbounded send and non-blocking receive is faster than
a contended GIL, but that remains to be measured.
Automatic diff as part of commit; lint not applicable. |
Automatic diff as part of commit; unit tests not applicable. |
Path | Packages | |||
---|---|---|---|---|
M | rust/hg-cpython/src/copy_tracing.rs (47 lines) |
use cpython::ObjectProtocol; | use cpython::ObjectProtocol; | ||||
use cpython::PyBytes; | use cpython::PyBytes; | ||||
use cpython::PyDict; | use cpython::PyDict; | ||||
use cpython::PyDrop; | |||||
use cpython::PyList; | use cpython::PyList; | ||||
use cpython::PyModule; | use cpython::PyModule; | ||||
use cpython::PyObject; | use cpython::PyObject; | ||||
use cpython::PyResult; | use cpython::PyResult; | ||||
use cpython::PyTuple; | use cpython::PyTuple; | ||||
use cpython::Python; | use cpython::Python; | ||||
use hg::copy_tracing::ChangedFiles; | use hg::copy_tracing::ChangedFiles; | ||||
} | } | ||||
} | } | ||||
pub fn data(&self) -> &[u8] { | pub fn data(&self) -> &[u8] { | ||||
// Safety: the raw pointer is valid as long as the PyBytes is still | // Safety: the raw pointer is valid as long as the PyBytes is still | ||||
// alive, and the returned slice borrows `self`. | // alive, and the returned slice borrows `self`. | ||||
unsafe { &*self.data } | unsafe { &*self.data } | ||||
} | } | ||||
pub fn unwrap(self) -> PyBytes { | |||||
self.keep_alive | |||||
} | |||||
} | } | ||||
} | } | ||||
/// Combines copies information contained into revision `revs` to build a copy | /// Combines copies information contained into revision `revs` to build a copy | ||||
/// map. | /// map. | ||||
/// | /// | ||||
/// See mercurial/copies.py for details | /// See mercurial/copies.py for details | ||||
pub fn combine_changeset_copies_wrapper( | pub fn combine_changeset_copies_wrapper( | ||||
let tuple: PyTuple = | let tuple: PyTuple = | ||||
rev_info.call(py, (rev_py,), None)?.cast_into(py)?; | rev_info.call(py, (rev_py,), None)?.cast_into(py)?; | ||||
let p1 = tuple.get_item(py, 0).extract(py)?; | let p1 = tuple.get_item(py, 0).extract(py)?; | ||||
let p2 = tuple.get_item(py, 1).extract(py)?; | let p2 = tuple.get_item(py, 1).extract(py)?; | ||||
let opt_bytes = tuple.get_item(py, 2).extract(py)?; | let opt_bytes = tuple.get_item(py, 2).extract(py)?; | ||||
Ok((rev, p1, p2, opt_bytes)) | Ok((rev, p1, p2, opt_bytes)) | ||||
}); | }); | ||||
let path_copies = if !multi_thread { | let path_copies; | ||||
if !multi_thread { | |||||
let mut combine_changeset_copies = | let mut combine_changeset_copies = | ||||
CombineChangesetCopies::new(children_count); | CombineChangesetCopies::new(children_count); | ||||
for rev_info in revs_info { | for rev_info in revs_info { | ||||
let (rev, p1, p2, opt_bytes) = rev_info?; | let (rev, p1, p2, opt_bytes) = rev_info?; | ||||
let files = match &opt_bytes { | let files = match &opt_bytes { | ||||
Some(bytes) => ChangedFiles::new(bytes.data(py)), | Some(bytes) => ChangedFiles::new(bytes.data(py)), | ||||
// Python None was extracted to Option::None, | // Python None was extracted to Option::None, | ||||
// meaning there was no copy data. | // meaning there was no copy data. | ||||
None => ChangedFiles::new_empty(), | None => ChangedFiles::new_empty(), | ||||
}; | }; | ||||
combine_changeset_copies.add_revision(rev, p1, p2, files) | combine_changeset_copies.add_revision(rev, p1, p2, files) | ||||
} | } | ||||
combine_changeset_copies.finish(target_rev) | path_copies = combine_changeset_copies.finish(target_rev) | ||||
} else { | } else { | ||||
// Use a bounded channel to provide back-pressure: | // Use a bounded channel to provide back-pressure: | ||||
// if the child thread is slower to process revisions than this thread | // if the child thread is slower to process revisions than this thread | ||||
// is to gather data for them, an unbounded channel would keep | // is to gather data for them, an unbounded channel would keep | ||||
// growing and eat memory. | // growing and eat memory. | ||||
// | // | ||||
// TODO: tweak the bound? | // TODO: tweak the bound? | ||||
let (rev_info_sender, rev_info_receiver) = | let (rev_info_sender, rev_info_receiver) = | ||||
crossbeam_channel::bounded::<RevInfo<PyBytesWithData>>(1000); | crossbeam_channel::bounded::<RevInfo<PyBytesWithData>>(1000); | ||||
// This channel (going the other way around) however is unbounded. | |||||
// If they were both bounded, there might potentially be deadlocks | |||||
// where both channels are full and both threads are waiting on each | |||||
// other. | |||||
let (pybytes_sender, pybytes_receiver) = | |||||
crossbeam_channel::unbounded(); | |||||
// Start a thread that does CPU-heavy processing in parallel with the | // Start a thread that does CPU-heavy processing in parallel with the | ||||
// loop below. | // loop below. | ||||
// | // | ||||
// If the parent thread panics, `rev_info_sender` will be dropped and | // If the parent thread panics, `rev_info_sender` will be dropped and | ||||
// “disconnected”. `rev_info_receiver` will be notified of this and | // “disconnected”. `rev_info_receiver` will be notified of this and | ||||
// exit its own loop. | // exit its own loop. | ||||
let thread = std::thread::spawn(move || { | let thread = std::thread::spawn(move || { | ||||
let mut combine_changeset_copies = | let mut combine_changeset_copies = | ||||
CombineChangesetCopies::new(children_count); | CombineChangesetCopies::new(children_count); | ||||
for (rev, p1, p2, opt_bytes) in rev_info_receiver { | for (rev, p1, p2, opt_bytes) in rev_info_receiver { | ||||
let files = match &opt_bytes { | let files = match &opt_bytes { | ||||
Some(raw) => ChangedFiles::new(raw.data()), | Some(raw) => ChangedFiles::new(raw.data()), | ||||
// Python None was extracted to Option::None, | // Python None was extracted to Option::None, | ||||
// meaning there was no copy data. | // meaning there was no copy data. | ||||
None => ChangedFiles::new_empty(), | None => ChangedFiles::new_empty(), | ||||
}; | }; | ||||
combine_changeset_copies.add_revision(rev, p1, p2, files) | combine_changeset_copies.add_revision(rev, p1, p2, files); | ||||
// The GIL is (still) implicitly acquired here through | // Send `PyBytes` back to the parent thread so the parent | ||||
// `impl Drop for PyBytes`. | // thread can drop it. Otherwise the GIL would be implicitly | ||||
// acquired here through `impl Drop for PyBytes`. | |||||
if let Some(bytes) = opt_bytes { | |||||
if let Err(_) = pybytes_sender.send(bytes.unwrap()) { | |||||
// The channel is disconnected, meaning the parent | |||||
// thread panicked or returned | |||||
// early through | |||||
// `?` to propagate a Python exception. | |||||
break; | |||||
} | |||||
} | |||||
} | } | ||||
combine_changeset_copies.finish(target_rev) | combine_changeset_copies.finish(target_rev) | ||||
}); | }); | ||||
for rev_info in revs_info { | for rev_info in revs_info { | ||||
let (rev, p1, p2, opt_bytes) = rev_info?; | let (rev, p1, p2, opt_bytes) = rev_info?; | ||||
let opt_bytes = opt_bytes.map(|b| PyBytesWithData::new(py, b)); | let opt_bytes = opt_bytes.map(|b| PyBytesWithData::new(py, b)); | ||||
// We’d prefer to avoid the child thread calling into Python code, | // We’d prefer to avoid the child thread calling into Python code, | ||||
// but this avoids a potential deadlock on the GIL if it does: | // but this avoids a potential deadlock on the GIL if it does: | ||||
py.allow_threads(|| { | py.allow_threads(|| { | ||||
rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( | rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( | ||||
"combine_changeset_copies: channel is disconnected", | "combine_changeset_copies: channel is disconnected", | ||||
); | ); | ||||
}); | }); | ||||
// Drop anything in the channel, without blocking | |||||
for pybytes in pybytes_receiver.try_iter() { | |||||
pybytes.release_ref(py) | |||||
} | |||||
} | } | ||||
// We’d prefer to avoid the child thread calling into Python code, | // We’d prefer to avoid the child thread calling into Python code, | ||||
// but this avoids a potential deadlock on the GIL if it does: | // but this avoids a potential deadlock on the GIL if it does: | ||||
py.allow_threads(|| { | path_copies = py.allow_threads(|| { | ||||
// Disconnect the channel to signal the child thread to stop: | // Disconnect the channel to signal the child thread to stop: | ||||
// the `for … in rev_info_receiver` loop will end. | // the `for … in rev_info_receiver` loop will end. | ||||
drop(rev_info_sender); | drop(rev_info_sender); | ||||
// Wait for the child thread to stop, and propagate any panic. | // Wait for the child thread to stop, and propagate any panic. | ||||
thread.join().unwrap_or_else(|panic_payload| { | thread.join().unwrap_or_else(|panic_payload| { | ||||
std::panic::resume_unwind(panic_payload) | std::panic::resume_unwind(panic_payload) | ||||
}) | }) | ||||
}) | }); | ||||
// Drop anything left in the channel | |||||
for pybytes in pybytes_receiver.iter() { | |||||
pybytes.release_ref(py) | |||||
} | |||||
}; | }; | ||||
let out = PyDict::new(py); | let out = PyDict::new(py); | ||||
for (dest, source) in path_copies.into_iter() { | for (dest, source) in path_copies.into_iter() { | ||||
out.set_item( | out.set_item( | ||||
py, | py, | ||||
PyBytes::new(py, &dest.into_vec()), | PyBytes::new(py, &dest.into_vec()), | ||||
PyBytes::new(py, &source.into_vec()), | PyBytes::new(py, &source.into_vec()), |