diff --git a/mercurial/configitems.py b/mercurial/configitems.py --- a/mercurial/configitems.py +++ b/mercurial/configitems.py @@ -687,6 +687,11 @@ ) coreconfigitem( b'devel', + b'copy-tracing.multi-thread', + default=True, +) +coreconfigitem( + b'devel', b'debug.extensions', default=False, ) diff --git a/mercurial/copies.py b/mercurial/copies.py --- a/mercurial/copies.py +++ b/mercurial/copies.py @@ -267,6 +267,7 @@ revs = cl.findmissingrevs(common=[a.rev()], heads=[b.rev()]) roots = set() has_graph_roots = False + multi_thread = repo.ui.configbool(b'devel', b'copy-tracing.multi-thread') # iterate over `only(B, A)` for r in revs: @@ -314,7 +315,7 @@ children_count[p] += 1 revinfo = _revinfo_getter(repo, match) return _combine_changeset_copies( - revs, children_count, b.rev(), revinfo, match, isancestor + revs, children_count, b.rev(), revinfo, match, isancestor, multi_thread ) else: # When not using side-data, we will process the edges "from" the parent. @@ -339,7 +340,7 @@ def _combine_changeset_copies( - revs, children_count, targetrev, revinfo, match, isancestor + revs, children_count, targetrev, revinfo, match, isancestor, multi_thread ): """combine the copies information for each item of iterrevs @@ -356,7 +357,7 @@ if rustmod is not None: final_copies = rustmod.combine_changeset_copies( - list(revs), children_count, targetrev, revinfo + list(revs), children_count, targetrev, revinfo, multi_thread ) else: isancestor = cached_is_ancestor(isancestor) diff --git a/rust/Cargo.lock b/rust/Cargo.lock --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -306,6 +306,7 @@ version = "0.1.0" dependencies = [ "cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "hg-core 0.1.0", "libc 0.2.81 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/rust/hg-cpython/Cargo.toml b/rust/hg-cpython/Cargo.toml --- a/rust/hg-cpython/Cargo.toml +++ b/rust/hg-cpython/Cargo.toml @@ -22,6 +22,7 @@ python3-bin = ["cpython/python3-sys"] [dependencies] +crossbeam-channel = "0.4" hg-core = { path = "../hg-core"} libc = '*' log = "0.4.8" diff --git a/rust/hg-cpython/src/copy_tracing.rs b/rust/hg-cpython/src/copy_tracing.rs --- a/rust/hg-cpython/src/copy_tracing.rs +++ b/rust/hg-cpython/src/copy_tracing.rs @@ -22,6 +22,7 @@ children_count: PyDict, target_rev: Revision, rev_info: PyObject, + multi_thread: bool, ) -> PyResult { let children_count = children_count .items(py) @@ -42,20 +43,81 @@ Ok((rev, p1, p2, opt_bytes)) }); - let mut combine_changeset_copies = - CombineChangesetCopies::new(children_count); + let path_copies = if !multi_thread { + let mut combine_changeset_copies = + CombineChangesetCopies::new(children_count); + + for rev_info in revs_info { + let (rev, p1, p2, opt_bytes) = rev_info?; + let files = match &opt_bytes { + Some(bytes) => ChangedFiles::new(bytes.data(py)), + // Python None was extracted to Option::None, + // meaning there was no copy data. + None => ChangedFiles::new_empty(), + }; + + combine_changeset_copies.add_revision(rev, p1, p2, files) + } + combine_changeset_copies.finish(target_rev) + } else { + // Use a bounded channel to provide back-pressure: + // if the child thread is slower to process revisions than this thread + // is to gather data for them, an unbounded channel would keep + // growing and eat memory. + // + // TODO: tweak the bound? + let (rev_info_sender, rev_info_receiver) = + crossbeam_channel::bounded::(1000); - for rev_info in revs_info { - let (rev, p1, p2, opt_bytes) = rev_info?; - let files = match &opt_bytes { - Some(bytes) => ChangedFiles::new(bytes.data(py)), - // value was presumably None, meaning they was no copy data. - None => ChangedFiles::new_empty(), - }; + // Start a thread that does CPU-heavy processing in parallel with the + // loop below. + // + // If the parent thread panics, `rev_info_sender` will be dropped and + // “disconnected”. `rev_info_receiver` will be notified of this and + // exit its own loop. + let thread = std::thread::spawn(move || { + let mut combine_changeset_copies = + CombineChangesetCopies::new(children_count); + for (rev, p1, p2, opt_bytes) in rev_info_receiver { + let gil = Python::acquire_gil(); + let py = gil.python(); + let files = match &opt_bytes { + Some(raw) => ChangedFiles::new(raw.data(py)), + // Python None was extracted to Option::None, + // meaning there was no copy data. + None => ChangedFiles::new_empty(), + }; + combine_changeset_copies.add_revision(rev, p1, p2, files) + } + + combine_changeset_copies.finish(target_rev) + }); - combine_changeset_copies.add_revision(rev, p1, p2, files) - } - let path_copies = combine_changeset_copies.finish(target_rev); + for rev_info in revs_info { + let (rev, p1, p2, opt_bytes) = rev_info?; + + // We’d prefer to avoid the child thread calling into Python code, + // but this avoids a potential deadlock on the GIL if it does: + py.allow_threads(|| { + rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( + "combine_changeset_copies: channel is disconnected", + ); + }); + } + // We’d prefer to avoid the child thread calling into Python code, + // but this avoids a potential deadlock on the GIL if it does: + py.allow_threads(|| { + // Disconnect the channel to signal the child thread to stop: + // the `for … in rev_info_receiver` loop will end. + drop(rev_info_sender); + + // Wait for the child thread to stop, and propagate any panic. + thread.join().unwrap_or_else(|panic_payload| { + std::panic::resume_unwind(panic_payload) + }) + }) + }; + let out = PyDict::new(py); for (dest, source) in path_copies.into_iter() { out.set_item( @@ -84,7 +146,8 @@ revs: PyList, children: PyDict, target_rev: Revision, - rev_info: PyObject + rev_info: PyObject, + multi_thread: bool ) ), )?;