diff --git a/hgext3rd/clindex.pyx b/hgext3rd/clindex.pyx --- a/hgext3rd/clindex.pyx +++ b/hgext3rd/clindex.pyx @@ -7,30 +7,87 @@ This extension replaces certain parts of changelog index algorithms to make it more efficient when changelog is large. + +Config:: + + [clindex] + # Use Rust nodemap + nodemap = True + + # Verify operations against other implementations. + # Turn this off for performance. + verify = True + + # Incrementally build Rust nodemap once it misses 20k revisions + lagthreshold = 20000 + + # Path to write logs (default: $repo/.hg/cache/clindex.log) + logpath = /tmp/a.log """ from __future__ import absolute_import +import datetime +import errno +import os + from mercurial import ( changelog, + chgserver, error, extensions, + localrepo, policy, + registrar, revlog, + util, + vfs as vfsmod, +) + +from mercurial.node import ( + hex, + nullhex, + nullid, ) -origindex = policy.importmod('parsers').index +from .rust import indexes + +configtable = {} +configitem = registrar.configitem(configtable) + +configitem('clindex', 'nodemap', default=True) +configitem('clindex', 'verify', default=True) + +# Inserting 20k nodes takes about 2ms. See https://phab.mercurial-scm.org/D1291 +# for the table of node count and performance. +configitem('clindex', 'lagthreshold', default=20000) + +# Path to write logs. +configitem('clindex', 'logpath', default=None) + +origindextype = policy.importmod('parsers').index # cdef is important for performance because it avoids dict lookups: # - `self._origindex` becomes `some_c_struct_pointer->_origindex` # - `__getitem__`, `__len__` will be using `PyMappingMethods` APIs cdef class clindex(object): + cdef readonly _changelog + cdef readonly localconfig _config + cdef readonly nodemap _nodemap cdef _origindex + cdef _vfs - def __init__(self, data, inlined): + def __init__(self, data, inlined, vfs, config): assert not inlined - self._origindex = origindex(data, inlined) + assert vfs + self._origindex = origindextype(data, inlined) + self._changelog = data + # Copy the config so it can be changed just for this clindex object. + # For example, disabling Rust nodemap temporarily if strip happens. + self._config = config.copy() + self._nodemap = nodemap(self._origindex, data, vfs, config) + self._vfs = vfs def ancestors(self, *revs): return self._origindex.ancestors(*revs) @@ -58,46 +115,353 @@ return self._origindex.deltachain(rev, stoprev, generaldelta) def insert(self, int rev, entry): - return self._origindex.insert(rev, entry) + if rev < 0: + rev = len(self._origindex) + rev + self._origindex.insert(rev, entry) + self._nodemap[entry[-1]] = rev def partialmatch(self, hexnode): - return self._origindex.partialmatch(hexnode) + return self._nodemap.partialmatch(hexnode) def __len__(self): return len(self._origindex) + def __delslice__(self, Py_ssize_t i, Py_ssize_t j): + # This one is tricky: it's called by strip. The Rust nodemap cannot + # really handle it easily so let's just disable it for now. + # repo.destroyed() will reconstruct a clindex object, which will + # re-enable and re-build the cache. + del self._origindex[i:j] + self._config.nodemap = False + @property def nodemap(self): - return nodemap(self._origindex) + return self._nodemap + + def destroying(self): + _log(self._vfs, 'clindex: destroying') + self._nodemap.destroying() + + def updatecaches(self): + self._nodemap.updatecache() cdef class nodemap(object): + """mutable nodemap + + Backed by an immutable nodemap implemented by Rust and a simple override + dict. The Rust nodemap only follows changelog index data while the nodemap + has to support __setitem__ to be compatible with the current Mercurial + APIs. + """ + cdef localconfig _config cdef _origindex + cdef readonly _overrides # {node: rev | None} + cdef readonly _rustnodemap + cdef _vfs + cdef readonly bint _updated + + emptyindex = indexes.nodemap.emptyindexbuffer() - def __init__(self, origindex): + def __init__(self, origindex, changelog, vfs, config): + self._config = config self._origindex = origindex + self._overrides = {} + self._vfs = vfs + try: + index = util.buffer(util.mmapread(vfs('nodemap'))) + if len(index) < len(self.emptyindex): + index = self.emptyindex + except IOError as ex: + if ex.errno != errno.ENOENT: + raise + _log(self._vfs, 'nodemap: is empty') + index = self.emptyindex + if config.nodemap: + try: + rustnodemap = indexes.nodemap(changelog, index) + except Exception as ex: + _log(self._vfs, 'nodemap: corrupted: %r' % ex) + rustnodemap = indexes.nodemap(changelog, self.emptyindex) + self._rustnodemap = rustnodemap + self._updated = False + + def updatecache(self): + # updatecache may get called for *many* times. That is, an "outdated" + # changelog object being used across multiple transactions. This test + # avoids unnecessary re-updates. + if self._updated: + return + # nodemap was disabled (ex. by destroying()). The changelog is now + # outdated. Do not rely on it building index. + if not self._config.nodemap: + return + # Writing nodemap has a cost. Do not update it if not lagging too much. + lag = self._rustnodemap.lag() + if lag == 0 or lag < self._config.lagthreshold: + return + _log(self._vfs, 'nodemap: updating (lag=%s)' % lag) + with self._vfs('nodemap', 'w', atomictemp=True) as f: + f.write(self._rustnodemap.build()) + self._updated = True def __getitem__(self, node): - return self._origindex[node] + if not self._config.nodemap: + return self._origindex[node] + + if node == nullid: + # special case for hg: '\0' * 20 => -1 + return -1 + if node in self._overrides: + rev = self._overrides[node] + elif self._config.verify: + try: + revorig = self._origindex[node] + except error.RevlogError: + revorig = None # convert "not found" to None + rev = _logifraise(self._vfs, + lambda: self._rustnodemap[node], + lambda: {'nodemap.getitem': hex(node), + 'revorig': revorig}) + if rev != revorig: + _logandraise(self._vfs, + 'nodemap: inconsistent getitem(%s): %r vs %r' + % (hex(node), rev, revorig)) + else: + rev = self._rustnodemap[node] + + if rev is None: + raise error.RevlogError + else: + return rev def __setitem__(self, node, rev): + self._overrides[node] = rev self._origindex[node] = rev + def __delitem__(self, node): + self._overrides[node] = None + def __contains__(self, node): - return node in self._origindex + if not self._config.nodemap: + return node in self._origindex + + if self._overrides.get(node) or node == nullid: + return True + + if self._config.verify: + resorig = node in self._origindex + res = _logifraise(self._vfs, + lambda: node in self._rustnodemap, + lambda: {'nodemap.contains': hex(node), + 'resorig': resorig}) + if res != resorig: + _logandraise(self._vfs, + 'nodemap: inconsistent contains(%s): %r vs %r' + % (hex(node), res, resorig)) + else: + res = node in self._rustnodemap + return res def get(self, node, default=None): - return self._origindex.get(node, default) + if self.__contains__(node): + return self.__getitem__(node) + else: + return default + + def partialmatch(self, hexprefix): + if not self._config.nodemap: + return self._origindex.partialmatch(hexprefix) + + if self._config.verify: + resorig = self._origindex.partialmatch(hexprefix) + res = _logifraise( + self._vfs, + lambda: self._rustpartialmatch(hexprefix), + lambda: {'partialmatch': hexprefix, 'resorig': resorig}) + if res != resorig: + _logandraise( + self._vfs, + 'nodemap: inconsistent partialmatch(%s): %r vs %r' + % (hexprefix, res, resorig)) + else: + res = self._rustpartialmatch(hexprefix) + return res + + cdef _rustpartialmatch(self, hexprefix): + candidates = set() + # Special case: nullid + if nullhex.startswith(hexprefix): + candidates.add(nullid) + try: + node = self._rustnodemap.partialmatch(hexprefix) + if node is not None: + candidates.add(node) + except RuntimeError as ex: + # Convert 'ambiguous prefix' to RevlogError. This is because the + # rust code cannot access RevlogError cleanly. So we do the + # conversion here. + if 'ambiguous prefix' in ex: + raise error.RevlogError + raise + + # Search nodes in overrides. This is needed because overrides could + # live outside the changelog snapshot and are unknown to the rust + # index. Ideally we can keep changelog always up-to-date with the + # index. But that requires more changes (ex. removing index.insert API + # and index takes care of data writes). + candidates.update(k for k in self._overrides.iterkeys() + if hex(k).startswith(hexprefix)) + if len(candidates) == 1: + return list(candidates)[0] + elif len(candidates) > 1: + raise error.RevlogError + else: + return None + + @property + def lag(self): + if self._config.nodemap: + return self._rustnodemap.lag() + else: + return 0 + + def destroying(self): + self._vfs.tryunlink('nodemap') + self._config.nodemap = False + +# These are unfortunate. But we need vfs access inside index.__init__. Doing +# that properly requires API changes in revlog.__init__ and +# revlogio.parseindex that might make things uglier, or break the (potential) +# intention of keeping revlog low-level, de-coupled from high-level objects +# including vfs and ui. So let's use a temporary global state to pass the +# vfs object and config options down to parseindex. +_cachevfs = None +_config = None + +# Lightweight config state that is dedicated for this extensions and is +# decoupled from heavy-weight ui object. +cdef class localconfig: + cdef public bint nodemap + cdef public bint verify + cdef public int lagthreshold + + def copy(self): + rhs = localconfig() + rhs.nodemap = self.nodemap + rhs.verify = self.verify + rhs.lagthreshold = self.lagthreshold + return rhs + + @classmethod + def fromui(cls, ui): + self = cls() + self.nodemap = ui.configbool('clindex', 'nodemap') + self.verify = ui.configbool('clindex', 'verify') + self.lagthreshold = ui.configint('clindex', 'lagthreshold') + return self def _parseindex(orig, self, data, inline): if inline: - # clindex does not support inline + # clindex does not support inline. fallback to original index return orig(self, data, inline) - index = clindex(data, inline) + index = clindex(data, inline, _cachevfs, _config) return index, index.nodemap, None -def _changeloginit(orig, self, *args, **kwargs): - with extensions.wrappedfunction(revlog.revlogio, 'parseindex', _parseindex): - orig(self, *args, **kwargs) +# Simple utilities to log debug messages +def _logandraise(vfs, message): + _log(vfs, message) + raise RuntimeError(message) + +def _logifraise(vfs, func, infofunc): + try: + return func() + except RuntimeError as ex: + _log(vfs, 'exception: %r %r' % (ex, infofunc())) + raise + +_logpath = None + +def _log(vfs, message): + try: + if _logpath: + f = open(_logpath, 'ab') + else: + f = vfs('clindex.log', 'ab') + with f: + timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') + pid = os.getpid() + f.write('%s [%d] %s\n' % (timestamp, pid, message)) + except IOError: + # The log is not important. IOError like "Permission denied" should not + # be fatal. + pass + +def _wrapchangelog(orig, repo): + # need to pass vfs to _parseindex so it can read the cache directory + global _cachevfs + _cachevfs = repo.cachevfs + + # pass a subset of config interesting to this extension + global _config + _config = localconfig.fromui(repo.ui) + + try: + with extensions.wrappedfunction(revlog.revlogio, + 'parseindex', _parseindex): + return orig(repo) + finally: + # do not leak them outside parseindex + _config = None + _cachevfs = None + +def reposetup(ui, repo): + if not repo.local(): + return + + unfilteredmethod = localrepo.unfilteredmethod + + class clindexrepo(repo.__class__): + @unfilteredmethod + def updatecaches(self, tr=None): + try: + self.changelog.index.updatecaches() + except AttributeError as ex: # pure, or clindex is not used + return + super(clindexrepo, self).updatecaches(tr) + + @unfilteredmethod + def destroying(self): + # Tell clindex to prepare for the strip. clindex will unlink + # nodemap and other caches. + try: + self.changelog.index.destroying() + except AttributeError as ex: + return + super(clindexrepo, self).destroying() + + @unfilteredmethod + def destroyed(self): + # Force a reload of changelog. The current "self.changelog" object + # has an outdated snapshot of changelog.i. We need to read the new + # version before updatecaches(). + if 'changelog' in self.__dict__: + del self.__dict__['changelog'] + if 'changelog' in self._filecache: + del self._filecache['changelog'] + # This calls "updatecachess" and will pick up the new changelog.i. + super(clindexrepo, self).destroyed() + + repo.__class__ = clindexrepo def uisetup(ui): - extensions.wrapfunction(changelog.changelog, '__init__', _changeloginit) + # uisetup has side effects depending on configs. Mark [clindex] config + # section sensitive so chg works correctly. + chgserver._configsections.append('clindex') + + # global logpath config + global _logpath + _logpath = ui.config('clindex', 'logpath') + + # filecache method has to be wrapped using wrapfilecache + extensions.wrapfilecache(localrepo.localrepository, 'changelog', + _wrapchangelog)