diff --git a/hgext3rd/clindex.pyx b/hgext3rd/clindex.pyx --- a/hgext3rd/clindex.pyx +++ b/hgext3rd/clindex.pyx @@ -7,30 +7,78 @@ This extension replaces certain parts of changelog index algorithms to make it more efficient when changelog is large. + +Config:: + + [clindex] + # Use Rust nodemap + nodemap = True + + # Verify operations against other implementations. + # Turn this off for performance. + verify = True + + # Incrementally build Rust nodemap once it misses 20k revisions + lagthreshold = 20000 """ from __future__ import absolute_import +import datetime +import errno +import os + from mercurial import ( changelog, + chgserver, error, extensions, + localrepo, policy, + registrar, revlog, + util, + vfs as vfsmod, +) + +from mercurial.node import ( + hex, + nullhex, + nullid, ) -origindex = policy.importmod('parsers').index +from hgext3rd.rust import indexes + +configtable = {} +configitem = registrar.configitem(configtable) + +configitem('clindex', 'nodemap', default=True) +configitem('clindex', 'verify', default=True) + +# Inserting 20k nodes takes about 2ms. See https://phab.mercurial-scm.org/D1291 +# for the table of node count and performance. +configitem('clindex', 'lagthreshold', default=20000) + +origindextype = policy.importmod('parsers').index # cdef is important for performance because it avoids dict lookups: # - `self._origindex` becomes `some_c_struct_pointer->_origindex` # - `__getitem__`, `__len__` will be using `PyMappingMethods` APIs cdef class clindex(object): + cdef config _config cdef _origindex - def __init__(self, data, inlined): + # "readonly" exposes nodemap as a property, forbids the property setter but + # the object is still mutable. + cdef readonly nodemap nodemap + + def __init__(self, data, inlined, vfs, config): assert not inlined - self._origindex = origindex(data, inlined) + assert vfs + self._config = config + self._origindex = origindextype(data, inlined) + self.nodemap = nodemap(self._origindex, data, vfs, config) def ancestors(self, *revs): return self._origindex.ancestors(*revs) @@ -58,46 +106,318 @@ return self._origindex.deltachain(rev, stoprev, generaldelta) def insert(self, int rev, entry): - return self._origindex.insert(rev, entry) + if rev < 0: + rev = len(self._origindex) + rev + self._origindex.insert(rev, entry) + self.nodemap[entry[-1]] = rev def partialmatch(self, hexnode): - return self._origindex.partialmatch(hexnode) + return self.nodemap.partialmatch(hexnode) def __len__(self): return len(self._origindex) - @property - def nodemap(self): - return nodemap(self._origindex) + def __delslice__(self, Py_ssize_t i, Py_ssize_t j): + # This one is tricky: it's called by strip. the Rust nodemap cannot + # really handle it easily so let's just disable it for now. + del self._origindex[i:j] + self._config.nodemap = False + + def destroying(self): + if self._config.nodemap: + self.nodemap.destroying() + + def destroyed(self): + if self._config.nodemap: + self.nodemap.destroyed() cdef class nodemap(object): + """mutable nodemap + + Backed by an immutable nodemap implemented by Rust and a simple override + dict. The Rust nodemap only follows changelog index data while the nodemap + has to support __setitem__ to be compatible with the current Mercurial + APIs. + """ + cdef config _config cdef _origindex + cdef readonly _overrides # {node: rev | None} + cdef _rustnodemap + cdef _vfs + cdef bint _destroyed + + # empty index buffer has a minimal of 17 * 4 bytes + emptyindex = b'\0' * 17 * 4 - def __init__(self, origindex): + def __init__(self, origindex, changelog, vfs, config): + self._config = config self._origindex = origindex + self._overrides = {} + self._vfs = vfs + self._destroyed = False + try: + index = util.buffer(util.mmapread(vfs('nodemap'))) + if len(index) < len(self.emptyindex): + index = self.emptyindex + except IOError as ex: + if ex.errno != errno.ENOENT: + raise + index = '' + if config.nodemap: + try: + rustnodemap = indexes.nodemap(changelog, index) + except Exception as ex: + _log('nodemap: corrupted: %r' % ex) + rustnodemap = indexes.nodemap(changelog, self.emptyindex) + self._rustnodemap = rustnodemap + + def updatecache(self): + if not self._config.nodemap: + return + if self._destroyed: + # self._changelog is invalid so we cannot build a new index right + # now in this case. Just skip and let the next transaction take + # care of it. + return + lag = self._rustnodemap.lag() + if lag < self._config.lagthreshold: + return + _log('nodemap: updating (lag=%s)' % lag) + with self._vfs('nodemap', 'w', atomictemp=True) as f: + f.write(self._rustnodemap.build()) + + def destroying(self): + _log('nodemap: destroying') + self._vfs.tryunlink('nodemap') + + def destroyed(self): + self._destroyed = True + self._vfs.tryunlink('nodemap') def __getitem__(self, node): - return self._origindex[node] + if not self._config.nodemap: + return self._origindex[node] + + if node == nullid: + # special case for hg: '\0' * 20 => -1 + return -1 + if node in self._overrides: + rev = self._overrides[node] + elif self._config.verify: + try: + revorig = self._origindex[node] + except error.RevlogError: + revorig = None # convert "not found" to None + rev = _logifraise(lambda: self._rustnodemap[node], + lambda: {'nodemap.getitem': hex(node), + 'revorig': revorig}) + if rev != revorig: + _logandraise('nodemap: inconsistent getitem(%s): %r vs %r' + % (hex(node), rev, revorig)) + else: + rev = self._rustnodemap[node] + + if rev is None: + raise error.RevlogError + else: + return rev def __setitem__(self, node, rev): + self._overrides[node] = rev self._origindex[node] = rev + def __delitem__(self, node): + self._overrides[node] = None + def __contains__(self, node): - return node in self._origindex + if not self._config.nodemap: + return node in self._origindex + + if self._overrides.get(node) or node == nullid: + return True + + if self._config.verify: + resorig = node in self._origindex + res = _logifraise(lambda: node in self._rustnodemap, + lambda: {'nodemap.contains': hex(node), + 'resorig': resorig}) + if res != resorig: + _logandraise('nodemap: inconsistent contains(%s): %r vs %r' + % (hex(node), res, resorig)) + else: + res = node in self._rustnodemap + return res def get(self, node, default=None): - return self._origindex.get(node, default) + if self.__contains__(node): + return self.__getitem__(node) + else: + return default + + def partialmatch(self, hexprefix): + if not self._config.nodemap: + return self._origindex.partialmatch(hexprefix) + + if self._config.verify: + resorig = self._origindex.partialmatch(hexprefix) + res = _logifraise( + lambda: self._rustpartialmatch(hexprefix), + lambda: {'partialmatch': hexprefix, 'resorig': resorig}) + if res != resorig: + _logandraise('nodemap: inconsistent partialmatch(%s): %r vs %r' + % (hexprefix, res, resorig)) + else: + res = self._rustpartialmatch(hexprefix) + return res + + cdef _rustpartialmatch(self, hexprefix): + candidates = set() + # Special case: nullid + if nullhex.startswith(hexprefix): + candidates.add(nullid) + try: + node = self._rustnodemap.partialmatch(hexprefix) + if node is not None: + candidates.add(node) + except RuntimeError as ex: + # Convert 'ambiguous prefix' to RevlogError. This is because the + # rust code cannot access RevlogError cleanly. So we do the + # conversion here. + if 'ambiguous prefix' in ex: + raise error.RevlogError + raise + + # Search nodes in overrides. This is needed because overrides could + # live outside the changelog snapshot and are unknown to the rust + # index. Ideally we can keep changelog always up-to-date with the + # index. But that requires more changes (ex. removing index.insert API + # and index takes care of data writes). + candidates.update(k for k in self._overrides.iterkeys() + if hex(k).startswith(hexprefix)) + if len(candidates) == 1: + return list(candidates)[0] + elif len(candidates) > 1: + raise error.RevlogError + else: + return None + + @property + def lag(self): + if self._config.nodemap: + return self._rustnodemap.lag() + else: + return 0 + +# These are unfortunate. But we need vfs access inside index.__init__. Doing +# that properly requires API changes in revlog.__init__ and +# revlogio.parseindex that might make things uglier, or break the (potential) +# intention of keeping revlog low-level, de-coupled from high-level objects +# including vfs and ui. So let's use a temporary global state to pass the +# vfs object and config options down to parseindex. +_cachevfs = None +_logpath = None +_config = None + +# Lightweight config state that is dedicated for this extensions and is +# decoupled from heavy-weight ui object. +cdef class config: + cdef readonly bint nodemap + cdef readonly bint verify + cdef readonly int lagthreshold + + def __cinit__(self, ui): + self.nodemap = ui.configbool('clindex', 'nodemap') + self.verify = ui.configbool('clindex', 'verify') + self.lagthreshold = ui.configint('clindex', 'lagthreshold') def _parseindex(orig, self, data, inline): if inline: - # clindex does not support inline + # clindex does not support inline. fallback to original index return orig(self, data, inline) - index = clindex(data, inline) + index = clindex(data, inline, _cachevfs, _config) return index, index.nodemap, None -def _changeloginit(orig, self, *args, **kwargs): - with extensions.wrappedfunction(revlog.revlogio, 'parseindex', _parseindex): - orig(self, *args, **kwargs) +# Simple utilities to log debug messages +def _logandraise(message): + _log(message) + raise RuntimeError(message) + +def _logifraise(func, infofunc): + try: + return func() + except RuntimeError as ex: + _log('exception: %r %r' % (ex, infofunc())) + raise + +def _log(message): + if not _logpath: + return + try: + with open(_logpath, 'ab') as f: + timestamp = datetime.datetime.now().strftime('%H:%M:%S.%f') + pid = os.getpid() + f.write('%s [%d] %s\n' % (timestamp, pid, message)) + except IOError: + # The log is not important. IOError like "Permission denied" should not + # be fatal. + pass + +def _wrapchangelog(orig, repo): + # need to pass vfs to _parseindex so it can read the cache directory + global _cachevfs + _cachevfs = repo.cachevfs + + # write debug logs until we have more confident of the feature + global _logpath + _logpath = _cachevfs.join('clindex.log') + + # pass a subset of config interesting to this extension + global _config + _config = config(repo.ui) + + try: + with extensions.wrappedfunction(revlog.revlogio, + 'parseindex', _parseindex): + return orig(repo) + finally: + _cachevfs = None + +def reposetup(ui, repo): + if not repo.local(): + return + unfi = repo.unfiltered() + + class clindexrepo(unfi.__class__): + def updatecaches(self, tr=None): + super(clindexrepo, self).updatecaches(tr) + try: + self.changelog.index.updatecaches() + except AttributeError: # pure - no index, or clindex is not used + return + + def destroying(self): + try: + self.changelog.index.destroying() + except AttributeError: + pass + super(clindexrepo, self).destroying() + + def destroyed(self): + # super.destroyed calls updatecaches. notify clindex first so it + # can skip updatecaches properly. + try: + self.changelog.index.destroyed() + except AttributeError: + pass + super(clindexrepo, self).destroyed() + + unfi.__class__ = clindexrepo def uisetup(ui): - extensions.wrapfunction(changelog.changelog, '__init__', _changeloginit) + # uisetup has side effects depending on configs. Mark [clindex] config + # section sensitive so chg works correctly. + chgserver._configsections.append('clindex') + + # filecache method has to be wrapped using wrapfilecache + extensions.wrapfilecache(localrepo.localrepository, 'changelog', + _wrapchangelog)