diff --git a/rust/hg-cpython/src/copy_tracing.rs b/rust/hg-cpython/src/copy_tracing.rs --- a/rust/hg-cpython/src/copy_tracing.rs +++ b/rust/hg-cpython/src/copy_tracing.rs @@ -1,6 +1,7 @@ use cpython::ObjectProtocol; use cpython::PyBytes; use cpython::PyDict; +use cpython::PyDrop; use cpython::PyList; use cpython::PyModule; use cpython::PyObject; @@ -58,6 +59,10 @@ // alive, and the returned slice borrows `self`. unsafe { &*self.data } } + + pub fn unwrap(self) -> PyBytes { + self.keep_alive + } } } @@ -93,7 +98,8 @@ Ok((rev, p1, p2, opt_bytes)) }); - let path_copies = if !multi_thread { + let path_copies; + if !multi_thread { let mut combine_changeset_copies = CombineChangesetCopies::new(children_count); @@ -108,7 +114,7 @@ combine_changeset_copies.add_revision(rev, p1, p2, files) } - combine_changeset_copies.finish(target_rev) + path_copies = combine_changeset_copies.finish(target_rev) } else { // Use a bounded channel to provide back-pressure: // if the child thread is slower to process revisions than this thread @@ -119,6 +125,13 @@ let (rev_info_sender, rev_info_receiver) = crossbeam_channel::bounded::>(1000); + // This channel (going the other way around) however is unbounded. + // If they were both bounded, there might potentially be deadlocks + // where both channels are full and both threads are waiting on each + // other. + let (pybytes_sender, pybytes_receiver) = + crossbeam_channel::unbounded(); + // Start a thread that does CPU-heavy processing in parallel with the // loop below. // @@ -135,10 +148,20 @@ // meaning there was no copy data. None => ChangedFiles::new_empty(), }; - combine_changeset_copies.add_revision(rev, p1, p2, files) + combine_changeset_copies.add_revision(rev, p1, p2, files); - // The GIL is (still) implicitly acquired here through - // `impl Drop for PyBytes`. + // Send `PyBytes` back to the parent thread so the parent + // thread can drop it. Otherwise the GIL would be implicitly + // acquired here through `impl Drop for PyBytes`. + if let Some(bytes) = opt_bytes { + if let Err(_) = pybytes_sender.send(bytes.unwrap()) { + // The channel is disconnected, meaning the parent + // thread panicked or returned + // early through + // `?` to propagate a Python exception. + break; + } + } } combine_changeset_copies.finish(target_rev) @@ -155,10 +178,15 @@ "combine_changeset_copies: channel is disconnected", ); }); + + // Drop anything in the channel, without blocking + for pybytes in pybytes_receiver.try_iter() { + pybytes.release_ref(py) + } } // We’d prefer to avoid the child thread calling into Python code, // but this avoids a potential deadlock on the GIL if it does: - py.allow_threads(|| { + path_copies = py.allow_threads(|| { // Disconnect the channel to signal the child thread to stop: // the `for … in rev_info_receiver` loop will end. drop(rev_info_sender); @@ -167,7 +195,12 @@ thread.join().unwrap_or_else(|panic_payload| { std::panic::resume_unwind(panic_payload) }) - }) + }); + + // Drop anything left in the channel + for pybytes in pybytes_receiver.iter() { + pybytes.release_ref(py) + } }; let out = PyDict::new(py);