So we can use it in other places to.
Replace its .data() method with the Deref<Target = [u8]> trait,
allowing this type to be used in generic contexts.
Rename the type accordingly.
( )
| Alphare |
| hg-reviewers |
So we can use it in other places to.
Replace its .data() method with the Deref<Target = [u8]> trait,
allowing this type to be used in generic contexts.
Rename the type accordingly.
| No Linters Available |
| No Unit Test Coverage |
| Path | Packages | |||
|---|---|---|---|---|
| M | rust/hg-cpython/src/copy_tracing.rs (59 lines) | |||
| M | rust/hg-cpython/src/lib.rs (1 line) | |||
| A | M | rust/hg-cpython/src/pybytes_deref.rs (53 lines) |
| Commit | Parents | Author | Summary | Date |
|---|---|---|---|---|
| b05ad92f8622 | 581a5dee7694 | Simon Sapin | Sep 6 2021, 7:39 AM |
| Status | Author | Revision | |
|---|---|---|---|
| Closed | SimonSapin | ||
| Closed | SimonSapin | ||
| Closed | SimonSapin | ||
| Closed | SimonSapin | ||
| Closed | SimonSapin |
| use cpython::ObjectProtocol; | use cpython::ObjectProtocol; | ||||
| use cpython::PyBytes; | use cpython::PyBytes; | ||||
| use cpython::PyDict; | use cpython::PyDict; | ||||
| use cpython::PyDrop; | use cpython::PyDrop; | ||||
| use cpython::PyList; | use cpython::PyList; | ||||
| use cpython::PyModule; | use cpython::PyModule; | ||||
| use cpython::PyObject; | use cpython::PyObject; | ||||
| use cpython::PyResult; | use cpython::PyResult; | ||||
| use cpython::PyTuple; | use cpython::PyTuple; | ||||
| use cpython::Python; | use cpython::Python; | ||||
| use hg::copy_tracing::ChangedFiles; | use hg::copy_tracing::ChangedFiles; | ||||
| use hg::copy_tracing::CombineChangesetCopies; | use hg::copy_tracing::CombineChangesetCopies; | ||||
| use hg::Revision; | use hg::Revision; | ||||
| use self::pybytes_with_data::PyBytesWithData; | use crate::pybytes_deref::PyBytesDeref; | ||||
| // Module to encapsulate private fields | |||||
| mod pybytes_with_data { | |||||
| use cpython::{PyBytes, Python}; | |||||
| /// Safe abstraction over a `PyBytes` together with the `&[u8]` slice | |||||
| /// that borrows it. | |||||
| /// | |||||
| /// Calling `PyBytes::data` requires a GIL marker but we want to access the | |||||
| /// data in a thread that (ideally) does not need to acquire the GIL. | |||||
| /// This type allows separating the call an the use. | |||||
| pub(super) struct PyBytesWithData { | |||||
| #[allow(unused)] | |||||
| keep_alive: PyBytes, | |||||
| /// Borrows the buffer inside `self.keep_alive`, | |||||
| /// but the borrow-checker cannot express self-referential structs. | |||||
| data: *const [u8], | |||||
| } | |||||
| fn require_send<T: Send>() {} | |||||
| #[allow(unused)] | |||||
| fn static_assert_pybytes_is_send() { | |||||
| require_send::<PyBytes>; | |||||
| } | |||||
| // Safety: PyBytes is Send. Raw pointers are not by default, | |||||
| // but here sending one to another thread is fine since we ensure it stays | |||||
| // valid. | |||||
| unsafe impl Send for PyBytesWithData {} | |||||
| impl PyBytesWithData { | |||||
| pub fn new(py: Python, bytes: PyBytes) -> Self { | |||||
| Self { | |||||
| data: bytes.data(py), | |||||
| keep_alive: bytes, | |||||
| } | |||||
| } | |||||
| pub fn data(&self) -> &[u8] { | |||||
| // Safety: the raw pointer is valid as long as the PyBytes is still | |||||
| // alive, and the returned slice borrows `self`. | |||||
| unsafe { &*self.data } | |||||
| } | |||||
| pub fn unwrap(self) -> PyBytes { | |||||
| self.keep_alive | |||||
| } | |||||
| } | |||||
| } | |||||
| /// Combines copies information contained into revision `revs` to build a copy | /// Combines copies information contained into revision `revs` to build a copy | ||||
| /// map. | /// map. | ||||
| /// | /// | ||||
| /// See mercurial/copies.py for details | /// See mercurial/copies.py for details | ||||
| pub fn combine_changeset_copies_wrapper( | pub fn combine_changeset_copies_wrapper( | ||||
| py: Python, | py: Python, | ||||
| revs: PyList, | revs: PyList, | ||||
| } else { | } else { | ||||
| // Use a bounded channel to provide back-pressure: | // Use a bounded channel to provide back-pressure: | ||||
| // if the child thread is slower to process revisions than this thread | // if the child thread is slower to process revisions than this thread | ||||
| // is to gather data for them, an unbounded channel would keep | // is to gather data for them, an unbounded channel would keep | ||||
| // growing and eat memory. | // growing and eat memory. | ||||
| // | // | ||||
| // TODO: tweak the bound? | // TODO: tweak the bound? | ||||
| let (rev_info_sender, rev_info_receiver) = | let (rev_info_sender, rev_info_receiver) = | ||||
| crossbeam_channel::bounded::<RevInfo<PyBytesWithData>>(1000); | crossbeam_channel::bounded::<RevInfo<PyBytesDeref>>(1000); | ||||
| // This channel (going the other way around) however is unbounded. | // This channel (going the other way around) however is unbounded. | ||||
| // If they were both bounded, there might potentially be deadlocks | // If they were both bounded, there might potentially be deadlocks | ||||
| // where both channels are full and both threads are waiting on each | // where both channels are full and both threads are waiting on each | ||||
| // other. | // other. | ||||
| let (pybytes_sender, pybytes_receiver) = | let (pybytes_sender, pybytes_receiver) = | ||||
| crossbeam_channel::unbounded(); | crossbeam_channel::unbounded(); | ||||
| // Start a thread that does CPU-heavy processing in parallel with the | // Start a thread that does CPU-heavy processing in parallel with the | ||||
| // loop below. | // loop below. | ||||
| // | // | ||||
| // If the parent thread panics, `rev_info_sender` will be dropped and | // If the parent thread panics, `rev_info_sender` will be dropped and | ||||
| // “disconnected”. `rev_info_receiver` will be notified of this and | // “disconnected”. `rev_info_receiver` will be notified of this and | ||||
| // exit its own loop. | // exit its own loop. | ||||
| let thread = std::thread::spawn(move || { | let thread = std::thread::spawn(move || { | ||||
| let mut combine_changeset_copies = | let mut combine_changeset_copies = | ||||
| CombineChangesetCopies::new(children_count); | CombineChangesetCopies::new(children_count); | ||||
| for (rev, p1, p2, opt_bytes) in rev_info_receiver { | for (rev, p1, p2, opt_bytes) in rev_info_receiver { | ||||
| let files = match &opt_bytes { | let files = match &opt_bytes { | ||||
| Some(raw) => ChangedFiles::new(raw.data()), | Some(raw) => ChangedFiles::new(raw.as_ref()), | ||||
| // Python None was extracted to Option::None, | // Python None was extracted to Option::None, | ||||
| // meaning there was no copy data. | // meaning there was no copy data. | ||||
| None => ChangedFiles::new_empty(), | None => ChangedFiles::new_empty(), | ||||
| }; | }; | ||||
| combine_changeset_copies.add_revision(rev, p1, p2, files); | combine_changeset_copies.add_revision(rev, p1, p2, files); | ||||
| // Send `PyBytes` back to the parent thread so the parent | // Send `PyBytes` back to the parent thread so the parent | ||||
| // thread can drop it. Otherwise the GIL would be implicitly | // thread can drop it. Otherwise the GIL would be implicitly | ||||
| } | } | ||||
| } | } | ||||
| combine_changeset_copies.finish(target_rev) | combine_changeset_copies.finish(target_rev) | ||||
| }); | }); | ||||
| for rev_info in revs_info { | for rev_info in revs_info { | ||||
| let (rev, p1, p2, opt_bytes) = rev_info?; | let (rev, p1, p2, opt_bytes) = rev_info?; | ||||
| let opt_bytes = opt_bytes.map(|b| PyBytesWithData::new(py, b)); | let opt_bytes = opt_bytes.map(|b| PyBytesDeref::new(py, b)); | ||||
| // We’d prefer to avoid the child thread calling into Python code, | // We’d prefer to avoid the child thread calling into Python code, | ||||
| // but this avoids a potential deadlock on the GIL if it does: | // but this avoids a potential deadlock on the GIL if it does: | ||||
| py.allow_threads(|| { | py.allow_threads(|| { | ||||
| rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( | rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( | ||||
| "combine_changeset_copies: channel is disconnected", | "combine_changeset_copies: channel is disconnected", | ||||
| ); | ); | ||||
| }); | }); | ||||
| pub mod ref_sharing; | pub mod ref_sharing; | ||||
| pub mod copy_tracing; | pub mod copy_tracing; | ||||
| pub mod dagops; | pub mod dagops; | ||||
| pub mod debug; | pub mod debug; | ||||
| pub mod dirstate; | pub mod dirstate; | ||||
| pub mod discovery; | pub mod discovery; | ||||
| pub mod exceptions; | pub mod exceptions; | ||||
| pub mod parsers; | pub mod parsers; | ||||
| mod pybytes_deref; | |||||
| pub mod revlog; | pub mod revlog; | ||||
| pub mod utils; | pub mod utils; | ||||
| py_module_initializer!(rustext, initrustext, PyInit_rustext, |py, m| { | py_module_initializer!(rustext, initrustext, PyInit_rustext, |py, m| { | ||||
| m.add( | m.add( | ||||
| py, | py, | ||||
| "__doc__", | "__doc__", | ||||
| "Mercurial core concepts - Rust implementation", | "Mercurial core concepts - Rust implementation", | ||||
| use cpython::{PyBytes, Python}; | |||||
| /// Safe abstraction over a `PyBytes` together with the `&[u8]` slice | |||||
| /// that borrows it. Implements `Deref<Target = [u8]>`. | |||||
| /// | |||||
| /// Calling `PyBytes::data` requires a GIL marker but we want to access the | |||||
| /// data in a thread that (ideally) does not need to acquire the GIL. | |||||
| /// This type allows separating the call an the use. | |||||
| /// | |||||
| /// It also enables using a (wrapped) `PyBytes` in GIL-unaware generic code. | |||||
| pub struct PyBytesDeref { | |||||
| #[allow(unused)] | |||||
| keep_alive: PyBytes, | |||||
| /// Borrows the buffer inside `self.keep_alive`, | |||||
| /// but the borrow-checker cannot express self-referential structs. | |||||
| data: *const [u8], | |||||
| } | |||||
| impl PyBytesDeref { | |||||
| pub fn new(py: Python, bytes: PyBytes) -> Self { | |||||
| Self { | |||||
| data: bytes.data(py), | |||||
| keep_alive: bytes, | |||||
| } | |||||
| } | |||||
| pub fn unwrap(self) -> PyBytes { | |||||
| self.keep_alive | |||||
| } | |||||
| } | |||||
| impl std::ops::Deref for PyBytesDeref { | |||||
| type Target = [u8]; | |||||
| fn deref(&self) -> &[u8] { | |||||
| // Safety: the raw pointer is valid as long as the PyBytes is still | |||||
| // alive, and the returned slice borrows `self`. | |||||
| unsafe { &*self.data } | |||||
| } | |||||
| } | |||||
| fn require_send<T: Send>() {} | |||||
| #[allow(unused)] | |||||
| fn static_assert_pybytes_is_send() { | |||||
| require_send::<PyBytes>; | |||||
| } | |||||
| // Safety: PyBytes is Send. Raw pointers are not by default, | |||||
| // but here sending one to another thread is fine since we ensure it stays | |||||
| // valid. | |||||
| unsafe impl Send for PyBytesDeref {} | |||||