diff --git a/hgext/sqlitestore.py b/hgext/sqlitestore.py new file mode 100644 --- /dev/null +++ b/hgext/sqlitestore.py @@ -0,0 +1,1113 @@ +# sqlitestore.py - Storage backend that uses SQLite +# +# Copyright 2018 Gregory Szorc +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + +"""store repository data in SQLite (EXPERIMENTAL) + +The sqlitestore extension enables the storage of repository data in SQLite. + +This extension is HIGHLY EXPERIMENTAL. There are NO BACKWARDS COMPATIBILITY +GUARANTEES. This means that repositories created with this extension may +only be usable with the exact version of this extension/Mercurial that was +used. The extension attempts to enforce this in order to prevent repository +corruption. + +In addition, several features are not yet supported or have known bugs: + +* Only some data is stored in SQLite. Changeset, manifest, and other repository + data is not yet stored in SQLite. +* Transactions are not robust. If the process is aborted at the right time + during transaction close/rollback, the repository could be in an inconsistent + state. This problem will diminish once all repository data is tracked by + SQLite. +* Bundle repositories do not work (the ability to use e.g. + `hg -R log` to automatically overlay a bundle on top of the + existing repository). +* Various other features don't work. + +This extension should work for basic clone/pull, update, and commit workflows. +Some history rewriting operations may fail due to lack of support for bundle +repositories. + +To use, activate the extension and set the ``storage.new-repo-backend`` config +option to ``sqlite`` to enable new repositories to use SQLite for storage. +""" + +# To run the test suite with repos using SQLite by default, execute the +# following: +# +# HGREPOFEATURES="sqlitestore" run-tests.py \ +# --extra-config-opt extensions.sqlitestore= \ +# --extra-config-opt storage.new-repo-backend=sqlite + +from __future__ import absolute_import + +import hashlib +import sqlite3 +import struct +import threading +import zlib + +from mercurial.i18n import _ +from mercurial.node import ( + nullid, + nullrev, + short, +) +from mercurial.thirdparty import ( + attr, +) +from mercurial import ( + ancestor, + dagop, + error, + extensions, + localrepo, + mdiff, + pycompat, + registrar, + repository, + util, + verify, +) +from mercurial.utils import ( + interfaceutil, + storageutil, +) + +try: + from mercurial import zstd + zstd.__version__ +except ImportError: + zstd = None + +configtable = {} +configitem = registrar.configitem(configtable) + +# experimental config: storage.sqlite.compression +configitem('storage', 'sqlite.compression', + default='zstd' if zstd else 'zlib') + +# Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for +# extensions which SHIP WITH MERCURIAL. Non-mainline extensions should +# be specifying the version(s) of Mercurial they are tested with, or +# leave the attribute unspecified. +testedwith = 'ships-with-hg-core' + +REQUIREMENT = b'exp-sqlite-001' +REQUIREMENT_ZSTD = b'exp-sqlite-comp-001=zstd' +REQUIREMENT_ZLIB = b'exp-sqlite-comp-001=zlib' +REQUIREMENT_NONE = b'exp-sqlite-comp-001=none' + +CURRENT_SCHEMA_VERSION = 1 + +COMPRESSION_NONE = 1 +COMPRESSION_ZSTD = 2 +COMPRESSION_ZLIB = 3 + +FLAG_CENSORED = 1 + +CREATE_SCHEMA = [ + # Deltas are stored as content-indexed blobs. + # compression column holds COMPRESSION_* constant for how the + # delta is encoded. + + r'CREATE TABLE delta (' + r' id INTEGER PRIMARY KEY, ' + r' compression INTEGER NOT NULL, ' + r' hash BLOB UNIQUE ON CONFLICT ABORT, ' + r' delta BLOB NOT NULL ' + r')', + + # Tracked paths are denormalized to integers to avoid redundant + # storage of the path name. + r'CREATE TABLE filepath (' + r' id INTEGER PRIMARY KEY, ' + r' path BLOB NOT NULL ' + r')', + + r'CREATE UNIQUE INDEX filepath_path ' + r' ON filepath (path)', + + # We have a single table for all file revision data. + # Each file revision is uniquely described by a (path, rev) and + # (path, node). + # + # Revision data is stored as a pointer to the delta producing this + # revision and the file revision whose delta should be applied before + # that one. One can reconstruct the delta chain by recursively following + # the delta base revision pointers until one encounters NULL. + # + # flags column holds bitwise integer flags controlling storage options. + # These flags are defined by the FLAG_* constants. + r'CREATE TABLE fileindex (' + r' id INTEGER PRIMARY KEY, ' + r' pathid INTEGER REFERENCES filepath(id), ' + r' revnum INTEGER NOT NULL, ' + r' p1rev INTEGER NOT NULL, ' + r' p2rev INTEGER NOT NULL, ' + r' linkrev INTEGER NOT NULL, ' + r' flags INTEGER NOT NULL, ' + r' deltaid INTEGER REFERENCES delta(id), ' + r' deltabaseid INTEGER REFERENCES fileindex(id), ' + r' node BLOB NOT NULL ' + r')', + + r'CREATE UNIQUE INDEX fileindex_pathrevnum ' + r' ON fileindex (pathid, revnum)', + + r'CREATE UNIQUE INDEX fileindex_pathnode ' + r' ON fileindex (pathid, node)', + + # Provide a view over all file data for convenience. + r'CREATE VIEW filedata AS ' + r'SELECT ' + r' fileindex.id AS id, ' + r' filepath.id AS pathid, ' + r' filepath.path AS path, ' + r' fileindex.revnum AS revnum, ' + r' fileindex.node AS node, ' + r' fileindex.p1rev AS p1rev, ' + r' fileindex.p2rev AS p2rev, ' + r' fileindex.linkrev AS linkrev, ' + r' fileindex.flags AS flags, ' + r' fileindex.deltaid AS deltaid, ' + r' fileindex.deltabaseid AS deltabaseid ' + r'FROM filepath, fileindex ' + r'WHERE fileindex.pathid=filepath.id', + + r'PRAGMA user_version=%d' % CURRENT_SCHEMA_VERSION, +] + +def resolvedeltachain(db, pathid, node, revisioncache, + stoprids, zstddctx=None): + """Resolve a delta chain for a file node.""" + + # TODO the "not in ({stops})" here is possibly slowing down the query + # because it needs to perform the lookup on every recursive invocation. + # This could possibly be faster if we created a temporary query with + # baseid "poisoned" to null and limited the recursive filter to + # "is not null". + res = db.execute( + r'WITH RECURSIVE ' + r' deltachain(deltaid, baseid) AS (' + r' SELECT deltaid, deltabaseid FROM fileindex ' + r' WHERE pathid=? AND node=? ' + r' UNION ALL ' + r' SELECT fileindex.deltaid, deltabaseid ' + r' FROM fileindex, deltachain ' + r' WHERE ' + r' fileindex.id=deltachain.baseid ' + r' AND deltachain.baseid IS NOT NULL ' + r' AND fileindex.id NOT IN ({stops}) ' + r' ) ' + r'SELECT deltachain.baseid, compression, delta ' + r'FROM deltachain, delta ' + r'WHERE delta.id=deltachain.deltaid'.format( + stops=r','.join([r'?'] * len(stoprids))), + tuple([pathid, node] + list(stoprids.keys()))) + + deltas = [] + lastdeltabaseid = None + + for deltabaseid, compression, delta in res: + lastdeltabaseid = deltabaseid + + if compression == COMPRESSION_ZSTD: + delta = zstddctx.decompress(delta) + elif compression == COMPRESSION_NONE: + delta = delta + elif compression == COMPRESSION_ZLIB: + delta = zlib.decompress(delta) + else: + raise SQLiteStoreError('unhandled compression type: %d' % + compression) + + deltas.append(delta) + + if lastdeltabaseid in stoprids: + basetext = revisioncache[stoprids[lastdeltabaseid]] + else: + basetext = deltas.pop() + + deltas.reverse() + fulltext = mdiff.patches(basetext, deltas) + + # SQLite returns buffer instances for blob columns on Python 2. This + # type can propagate through the delta application layer. Because + # downstream callers assume revisions are bytes, cast as needed. + if not isinstance(fulltext, bytes): + fulltext = bytes(delta) + + return fulltext + +def insertdelta(db, compression, hash, delta): + try: + return db.execute( + r'INSERT INTO delta (compression, hash, delta) ' + r'VALUES (?, ?, ?)', + (compression, hash, delta)).lastrowid + except sqlite3.IntegrityError: + return db.execute( + r'SELECT id FROM delta WHERE hash=?', + (hash,)).fetchone()[0] + +class SQLiteStoreError(error.StorageError): + pass + +@attr.s +class revisionentry(object): + rid = attr.ib() + rev = attr.ib() + node = attr.ib() + p1rev = attr.ib() + p2rev = attr.ib() + p1node = attr.ib() + p2node = attr.ib() + linkrev = attr.ib() + flags = attr.ib() + +@interfaceutil.implementer(repository.irevisiondelta) +@attr.s(slots=True) +class sqliterevisiondelta(object): + node = attr.ib() + p1node = attr.ib() + p2node = attr.ib() + basenode = attr.ib() + flags = attr.ib() + baserevisionsize = attr.ib() + revision = attr.ib() + delta = attr.ib() + linknode = attr.ib(default=None) + +@interfaceutil.implementer(repository.iverifyproblem) +@attr.s(frozen=True) +class sqliteproblem(object): + warning = attr.ib(default=None) + error = attr.ib(default=None) + node = attr.ib(default=None) + +@interfaceutil.implementer(repository.ifilestorage) +class sqlitefilestore(object): + """Implements storage for an individual tracked path.""" + + def __init__(self, db, path, compression): + self._db = db + self._path = path + + self._pathid = None + + # revnum -> node + self._revtonode = {} + # node -> revnum + self._nodetorev = {} + # node -> data structure + self._revisions = {} + + self._revisioncache = util.lrucachedict(10) + + self._compengine = compression + + if compression == 'zstd': + self._cctx = zstd.ZstdCompressor(level=3) + self._dctx = zstd.ZstdDecompressor() + else: + self._cctx = None + self._dctx = None + + self._refreshindex() + + def _refreshindex(self): + self._revtonode = {} + self._nodetorev = {} + self._revisions = {} + + res = list(self._db.execute( + r'SELECT id FROM filepath WHERE path=?', (self._path,))) + + if not res: + self._pathid = None + return + + self._pathid = res[0][0] + + res = self._db.execute( + r'SELECT id, revnum, node, p1rev, p2rev, linkrev, flags ' + r'FROM fileindex ' + r'WHERE pathid=? ' + r'ORDER BY revnum ASC', + (self._pathid,)) + + for i, row in enumerate(res): + rid, rev, node, p1rev, p2rev, linkrev, flags = row + + if i != rev: + raise SQLiteStoreError(_('sqlite database has inconsistent ' + 'revision numbers')) + + if p1rev == nullrev: + p1node = nullid + else: + p1node = self._revtonode[p1rev] + + if p2rev == nullrev: + p2node = nullid + else: + p2node = self._revtonode[p2rev] + + entry = revisionentry( + rid=rid, + rev=rev, + node=node, + p1rev=p1rev, + p2rev=p2rev, + p1node=p1node, + p2node=p2node, + linkrev=linkrev, + flags=flags) + + self._revtonode[rev] = node + self._nodetorev[node] = rev + self._revisions[node] = entry + + # Start of ifileindex interface. + + def __len__(self): + return len(self._revisions) + + def __iter__(self): + return iter(pycompat.xrange(len(self._revisions))) + + def revs(self, start=0, stop=None): + return storageutil.iterrevs(len(self._revisions), start=start, + stop=stop) + + def parents(self, node): + if node == nullid: + return nullid, nullid + + if node not in self._revisions: + raise error.LookupError(node, self._path, _('no node')) + + entry = self._revisions[node] + return entry.p1node, entry.p2node + + def parentrevs(self, rev): + if rev == nullrev: + return nullrev, nullrev + + if rev not in self._revtonode: + raise IndexError(rev) + + entry = self._revisions[self._revtonode[rev]] + return entry.p1rev, entry.p2rev + + def rev(self, node): + if node == nullid: + return nullrev + + if node not in self._nodetorev: + raise error.LookupError(node, self._path, _('no node')) + + return self._nodetorev[node] + + def node(self, rev): + if rev == nullrev: + return nullid + + if rev not in self._revtonode: + raise IndexError(rev) + + return self._revtonode[rev] + + def lookup(self, node): + return storageutil.fileidlookup(self, node, self._path) + + def linkrev(self, rev): + if rev == nullrev: + return nullrev + + if rev not in self._revtonode: + raise IndexError(rev) + + entry = self._revisions[self._revtonode[rev]] + return entry.linkrev + + def iscensored(self, rev): + if rev == nullrev: + return False + + if rev not in self._revtonode: + raise IndexError(rev) + + return self._revisions[self._revtonode[rev]].flags & FLAG_CENSORED + + def commonancestorsheads(self, node1, node2): + rev1 = self.rev(node1) + rev2 = self.rev(node2) + + ancestors = ancestor.commonancestorsheads(self.parentrevs, rev1, rev2) + return pycompat.maplist(self.node, ancestors) + + def descendants(self, revs): + # TODO we could implement this using a recursive SQL query, which + # might be faster. + return dagop.descendantrevs(revs, self.revs, self.parentrevs) + + def heads(self, start=None, stop=None): + if start is None and stop is None: + if not len(self): + return [nullid] + + startrev = self.rev(start) if start is not None else nullrev + stoprevs = {self.rev(n) for n in stop or []} + + revs = dagop.headrevssubset(self.revs, self.parentrevs, + startrev=startrev, stoprevs=stoprevs) + + return [self.node(rev) for rev in revs] + + def children(self, node): + rev = self.rev(node) + + res = self._db.execute( + r'SELECT' + r' node ' + r' FROM filedata ' + r' WHERE path=? AND (p1rev=? OR p2rev=?) ' + r' ORDER BY revnum ASC', + (self._path, rev, rev)) + + return [row[0] for row in res] + + # End of ifileindex interface. + + # Start of ifiledata interface. + + def size(self, rev): + if rev == nullrev: + return 0 + + if rev not in self._revtonode: + raise IndexError(rev) + + node = self._revtonode[rev] + + if self.renamed(node): + return len(self.read(node)) + + return len(self.revision(node)) + + def revision(self, node, raw=False, _verifyhash=True): + if node in (nullid, nullrev): + return b'' + + if isinstance(node, int): + node = self.node(node) + + if node not in self._nodetorev: + raise error.LookupError(node, self._path, _('no node')) + + if node in self._revisioncache: + return self._revisioncache[node] + + # Because we have a fulltext revision cache, we are able to + # short-circuit delta chain traversal and decompression as soon as + # we encounter a revision in the cache. + + stoprids = {self._revisions[n].rid: n + for n in self._revisioncache} + + if not stoprids: + stoprids[-1] = None + + fulltext = resolvedeltachain(self._db, self._pathid, node, + self._revisioncache, stoprids, + zstddctx=self._dctx) + + if _verifyhash: + self._checkhash(fulltext, node) + self._revisioncache[node] = fulltext + + return fulltext + + def read(self, node): + return storageutil.filtermetadata(self.revision(node)) + + def renamed(self, node): + return storageutil.filerevisioncopied(self, node) + + def cmp(self, node, fulltext): + return not storageutil.filedataequivalent(self, node, fulltext) + + def emitrevisions(self, nodes, nodesorder=None, revisiondata=False, + assumehaveparentrevisions=False, deltaprevious=False): + if nodesorder not in ('nodes', 'storage', None): + raise error.ProgrammingError('unhandled value for nodesorder: %s' % + nodesorder) + + nodes = [n for n in nodes if n != nullid] + + if not nodes: + return + + # TODO perform in a single query. + res = self._db.execute( + r'SELECT revnum, deltaid FROM fileindex ' + r'WHERE pathid=? ' + r' AND node in (%s)' % (r','.join([r'?'] * len(nodes))), + tuple([self._pathid] + nodes)) + + deltabases = {} + + for rev, deltaid in res: + res = self._db.execute( + r'SELECT revnum from fileindex WHERE pathid=? AND deltaid=?', + (self._pathid, deltaid)) + deltabases[rev] = res.fetchone()[0] + + # TODO define revdifffn so we can use delta from storage. + for delta in storageutil.emitrevisions( + self, nodes, nodesorder, sqliterevisiondelta, + deltaparentfn=deltabases.__getitem__, + revisiondata=revisiondata, + assumehaveparentrevisions=assumehaveparentrevisions, + deltaprevious=deltaprevious): + + yield delta + + # End of ifiledata interface. + + # Start of ifilemutation interface. + + def add(self, filedata, meta, transaction, linkrev, p1, p2): + if meta or filedata.startswith(b'\x01\n'): + filedata = storageutil.packmeta(meta, filedata) + + return self.addrevision(filedata, transaction, linkrev, p1, p2) + + def addrevision(self, revisiondata, transaction, linkrev, p1, p2, node=None, + flags=0, cachedelta=None): + if flags: + raise SQLiteStoreError(_('flags not supported on revisions')) + + validatehash = node is not None + node = node or storageutil.hashrevisionsha1(revisiondata, p1, p2) + + if validatehash: + self._checkhash(revisiondata, node, p1, p2) + + if node in self._nodetorev: + return node + + node = self._addrawrevision(node, revisiondata, transaction, linkrev, + p1, p2) + + self._revisioncache[node] = revisiondata + return node + + def addgroup(self, deltas, linkmapper, transaction, addrevisioncb=None): + nodes = [] + + for node, p1, p2, linknode, deltabase, delta, wireflags in deltas: + storeflags = 0 + + if wireflags & repository.REVISION_FLAG_CENSORED: + storeflags |= FLAG_CENSORED + + if wireflags & ~repository.REVISION_FLAG_CENSORED: + raise SQLiteStoreError('unhandled revision flag') + + baserev = self.rev(deltabase) + + # If base is censored, delta must be full replacement in a single + # patch operation. + if baserev != nullrev and self.iscensored(baserev): + hlen = struct.calcsize('>lll') + oldlen = len(self.revision(deltabase, raw=True, + _verifyhash=False)) + newlen = len(delta) - hlen + + if delta[:hlen] != mdiff.replacediffheader(oldlen, newlen): + raise error.CensoredBaseError(self._path, + deltabase) + + if (not (storeflags & FLAG_CENSORED) + and storageutil.deltaiscensored( + delta, baserev, lambda x: len(self.revision(x, raw=True)))): + storeflags |= FLAG_CENSORED + + linkrev = linkmapper(linknode) + + nodes.append(node) + + if node in self._revisions: + continue + + if deltabase == nullid: + text = mdiff.patch(b'', delta) + storedelta = None + else: + text = None + storedelta = (deltabase, delta) + + self._addrawrevision(node, text, transaction, linkrev, p1, p2, + storedelta=storedelta, flags=storeflags) + + if addrevisioncb: + addrevisioncb(self, node) + + return nodes + + def censorrevision(self, tr, censornode, tombstone=b''): + tombstone = storageutil.packmeta({b'censored': tombstone}, b'') + + # This restriction is cargo culted from revlogs and makes no sense for + # SQLite, since columns can be resized at will. + if len(tombstone) > len(self.revision(censornode, raw=True)): + raise error.Abort(_('censor tombstone must be no longer than ' + 'censored data')) + + # We need to replace the censored revision's data with the tombstone. + # But replacing that data will have implications for delta chains that + # reference it. + # + # While "better," more complex strategies are possible, we do something + # simple: we find delta chain children of the censored revision and we + # replace those incremental deltas with fulltexts of their corresponding + # revision. Then we delete the now-unreferenced delta and original + # revision and insert a replacement. + + # Find the delta to be censored. + censoreddeltaid = self._db.execute( + r'SELECT deltaid FROM fileindex WHERE id=?', + (self._revisions[censornode].rid,)).fetchone()[0] + + # Find all its delta chain children. + # TODO once we support storing deltas for !files, we'll need to look + # for those delta chains too. + rows = list(self._db.execute( + r'SELECT id, pathid, node FROM fileindex ' + r'WHERE deltabaseid=? OR deltaid=?', + (censoreddeltaid, censoreddeltaid))) + + for row in rows: + rid, pathid, node = row + + fulltext = resolvedeltachain(self._db, pathid, node, {}, {-1: None}, + zstddctx=self._dctx) + + deltahash = hashlib.sha1(fulltext).digest() + + if self._compengine == 'zstd': + deltablob = self._cctx.compress(fulltext) + compression = COMPRESSION_ZSTD + elif self._compengine == 'zlib': + deltablob = zlib.compress(fulltext) + compression = COMPRESSION_ZLIB + elif self._compengine == 'none': + deltablob = fulltext + compression = COMPRESSION_NONE + else: + raise error.ProgrammingError('unhandled compression engine: %s' + % self._compengine) + + if len(deltablob) >= len(fulltext): + deltablob = fulltext + compression = COMPRESSION_NONE + + deltaid = insertdelta(self._db, compression, deltahash, deltablob) + + self._db.execute( + r'UPDATE fileindex SET deltaid=?, deltabaseid=NULL ' + r'WHERE id=?', (deltaid, rid)) + + # Now create the tombstone delta and replace the delta on the censored + # node. + deltahash = hashlib.sha1(tombstone).digest() + tombstonedeltaid = insertdelta(self._db, COMPRESSION_NONE, + deltahash, tombstone) + + flags = self._revisions[censornode].flags + flags |= FLAG_CENSORED + + self._db.execute( + r'UPDATE fileindex SET flags=?, deltaid=?, deltabaseid=NULL ' + r'WHERE pathid=? AND node=?', + (flags, tombstonedeltaid, self._pathid, censornode)) + + self._db.execute( + r'DELETE FROM delta WHERE id=?', (censoreddeltaid,)) + + self._refreshindex() + self._revisioncache.clear() + + def getstrippoint(self, minlink): + return storageutil.resolvestripinfo(minlink, len(self) - 1, + [self.rev(n) for n in self.heads()], + self.linkrev, + self.parentrevs) + + def strip(self, minlink, transaction): + if not len(self): + return + + rev, _ignored = self.getstrippoint(minlink) + + if rev == len(self): + return + + for rev in self.revs(rev): + self._db.execute( + r'DELETE FROM fileindex WHERE pathid=? AND node=?', + (self._pathid, self.node(rev))) + + # TODO how should we garbage collect data in delta table? + + self._refreshindex() + + # End of ifilemutation interface. + + # Start of ifilestorage interface. + + def files(self): + return [] + + def storageinfo(self, exclusivefiles=False, sharedfiles=False, + revisionscount=False, trackedsize=False, + storedsize=False): + d = {} + + if exclusivefiles: + d['exclusivefiles'] = [] + + if sharedfiles: + # TODO list sqlite file(s) here. + d['sharedfiles'] = [] + + if revisionscount: + d['revisionscount'] = len(self) + + if trackedsize: + d['trackedsize'] = sum(len(self.revision(node)) + for node in self._nodetorev) + + if storedsize: + # TODO implement this? + d['storedsize'] = None + + return d + + def verifyintegrity(self, state): + state['skipread'] = set() + + for rev in self: + node = self.node(rev) + + try: + self.revision(node) + except Exception as e: + yield sqliteproblem( + error=_('unpacking %s: %s') % (short(node), e), + node=node) + + state['skipread'].add(node) + + # End of ifilestorage interface. + + def _checkhash(self, fulltext, node, p1=None, p2=None): + if p1 is None and p2 is None: + p1, p2 = self.parents(node) + + if node == storageutil.hashrevisionsha1(fulltext, p1, p2): + return + + try: + del self._revisioncache[node] + except KeyError: + pass + + if storageutil.iscensoredtext(fulltext): + raise error.CensoredNodeError(self._path, node, fulltext) + + raise SQLiteStoreError(_('integrity check failed on %s') % + self._path) + + def _addrawrevision(self, node, revisiondata, transaction, linkrev, + p1, p2, storedelta=None, flags=0): + if self._pathid is None: + res = self._db.execute( + r'INSERT INTO filepath (path) VALUES (?)', (self._path,)) + self._pathid = res.lastrowid + + # For simplicity, always store a delta against p1. + # TODO we need a lot more logic here to make behavior reasonable. + + if storedelta: + deltabase, delta = storedelta + + if isinstance(deltabase, int): + deltabase = self.node(deltabase) + + else: + assert revisiondata is not None + deltabase = p1 + + if deltabase == nullid: + delta = revisiondata + else: + delta = mdiff.textdiff(self.revision(self.rev(deltabase)), + revisiondata) + + # File index stores a pointer to its delta and the parent delta. + # The parent delta is stored via a pointer to the fileindex PK. + if deltabase == nullid: + baseid = None + else: + baseid = self._revisions[deltabase].rid + + # Deltas are stored with a hash of their content. This allows + # us to de-duplicate. The table is configured to ignore conflicts + # and it is faster to just insert and silently noop than to look + # first. + deltahash = hashlib.sha1(delta).digest() + + if self._compengine == 'zstd': + deltablob = self._cctx.compress(delta) + compression = COMPRESSION_ZSTD + elif self._compengine == 'zlib': + deltablob = zlib.compress(delta) + compression = COMPRESSION_ZLIB + elif self._compengine == 'none': + deltablob = delta + compression = COMPRESSION_NONE + else: + raise error.ProgrammingError('unhandled compression engine: %s' % + self._compengine) + + # Don't store compressed data if it isn't practical. + if len(deltablob) >= len(delta): + deltablob = delta + compression = COMPRESSION_NONE + + deltaid = insertdelta(self._db, compression, deltahash, deltablob) + + rev = len(self) + + if p1 == nullid: + p1rev = nullrev + else: + p1rev = self._nodetorev[p1] + + if p2 == nullid: + p2rev = nullrev + else: + p2rev = self._nodetorev[p2] + + rid = self._db.execute( + r'INSERT INTO fileindex (' + r' pathid, revnum, node, p1rev, p2rev, linkrev, flags, ' + r' deltaid, deltabaseid) ' + r' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)', + (self._pathid, rev, node, p1rev, p2rev, linkrev, flags, + deltaid, baseid) + ).lastrowid + + entry = revisionentry( + rid=rid, + rev=rev, + node=node, + p1rev=p1rev, + p2rev=p2rev, + p1node=p1, + p2node=p2, + linkrev=linkrev, + flags=flags) + + self._nodetorev[node] = rev + self._revtonode[rev] = node + self._revisions[node] = entry + + return node + +class sqliterepository(localrepo.localrepository): + def cancopy(self): + return False + + def transaction(self, *args, **kwargs): + current = self.currenttransaction() + + tr = super(sqliterepository, self).transaction(*args, **kwargs) + + if current: + return tr + + self._dbconn.execute(r'BEGIN TRANSACTION') + + def committransaction(_): + self._dbconn.commit() + + tr.addfinalize('sqlitestore', committransaction) + + return tr + + @property + def _dbconn(self): + # SQLite connections can only be used on the thread that created + # them. In most cases, this "just works." However, hgweb uses + # multiple threads. + tid = threading.current_thread().ident + + if self._db: + if self._db[0] == tid: + return self._db[1] + + db = makedb(self.svfs.join('db.sqlite')) + self._db = (tid, db) + + return db + +def makedb(path): + """Construct a database handle for a database at path.""" + + db = sqlite3.connect(path) + db.text_factory = bytes + + res = db.execute(r'PRAGMA user_version').fetchone()[0] + + # New database. + if res == 0: + for statement in CREATE_SCHEMA: + db.execute(statement) + + db.commit() + + elif res == CURRENT_SCHEMA_VERSION: + pass + + else: + raise error.Abort(_('sqlite database has unrecognized version')) + + db.execute(r'PRAGMA journal_mode=WAL') + + return db + +def featuresetup(ui, supported): + supported.add(REQUIREMENT) + + if zstd: + supported.add(REQUIREMENT_ZSTD) + + supported.add(REQUIREMENT_ZLIB) + supported.add(REQUIREMENT_NONE) + +def newreporequirements(orig, ui, createopts): + if createopts['backend'] != 'sqlite': + return orig(ui, createopts) + + # This restriction can be lifted once we have more confidence. + if 'sharedrepo' in createopts: + raise error.Abort(_('shared repositories not supported with SQLite ' + 'store')) + + # This filtering is out of an abundance of caution: we want to ensure + # we honor creation options and we do that by annotating exactly the + # creation options we recognize. + known = { + 'narrowfiles', + 'backend', + } + + unsupported = set(createopts) - known + if unsupported: + raise error.Abort(_('SQLite store does not support repo creation ' + 'option: %s') % ', '.join(sorted(unsupported))) + + # Since we're a hybrid store that still relies on revlogs, we fall back + # to using the revlogv1 backend's storage requirements then adding our + # own requirement. + createopts['backend'] = 'revlogv1' + requirements = orig(ui, createopts) + requirements.add(REQUIREMENT) + + compression = ui.config('storage', 'sqlite.compression') + + if compression == 'zstd' and not zstd: + raise error.Abort(_('storage.sqlite.compression set to "zstd" but ' + 'zstandard compression not available to this ' + 'Mercurial install')) + + if compression == 'zstd': + requirements.add(REQUIREMENT_ZSTD) + elif compression == 'zlib': + requirements.add(REQUIREMENT_ZLIB) + elif compression == 'none': + requirements.add(REQUIREMENT_NONE) + else: + raise error.Abort(_('unknown compression engine defined in ' + 'storage.sqlite.compression: %s') % compression) + + return requirements + +@interfaceutil.implementer(repository.ilocalrepositoryfilestorage) +class sqlitefilestorage(object): + """Repository file storage backed by SQLite.""" + def file(self, path): + if path[0] == b'/': + path = path[1:] + + if REQUIREMENT_ZSTD in self.requirements: + compression = 'zstd' + elif REQUIREMENT_ZLIB in self.requirements: + compression = 'zlib' + elif REQUIREMENT_NONE in self.requirements: + compression = 'none' + else: + raise error.Abort(_('unable to determine what compression engine ' + 'to use for SQLite storage')) + + return sqlitefilestore(self._dbconn, path, compression) + +def makefilestorage(orig, requirements, **kwargs): + """Produce a type conforming to ``ilocalrepositoryfilestorage``.""" + if REQUIREMENT in requirements: + return sqlitefilestorage + else: + return orig(requirements=requirements, **kwargs) + +def makemain(orig, ui, requirements, **kwargs): + if REQUIREMENT in requirements: + if REQUIREMENT_ZSTD in requirements and not zstd: + raise error.Abort(_('repository uses zstandard compression, which ' + 'is not available to this Mercurial install')) + + return sqliterepository + + return orig(requirements=requirements, **kwargs) + +def verifierinit(orig, self, *args, **kwargs): + orig(self, *args, **kwargs) + + # We don't care that files in the store don't align with what is + # advertised. So suppress these warnings. + self.warnorphanstorefiles = False + +def extsetup(ui): + localrepo.featuresetupfuncs.add(featuresetup) + extensions.wrapfunction(localrepo, 'newreporequirements', + newreporequirements) + extensions.wrapfunction(localrepo, 'makefilestorage', + makefilestorage) + extensions.wrapfunction(localrepo, 'makemain', + makemain) + extensions.wrapfunction(verify.verifier, '__init__', + verifierinit) + +def reposetup(ui, repo): + if isinstance(repo, sqliterepository): + repo._db = None + + # TODO check for bundlerepository? diff --git a/tests/hghave.py b/tests/hghave.py --- a/tests/hghave.py +++ b/tests/hghave.py @@ -787,6 +787,16 @@ def has_repofncache(): return 'fncache' in getrepofeatures() +@check('sqlite', 'sqlite3 module is available') +def has_sqlite(): + try: + import sqlite3 + sqlite3.sqlite_version + except ImportError: + return False + + return matchoutput('sqlite3 -version', b'^3\.\d+') + @check('vcr', 'vcr http mocking library') def has_vcr(): try: diff --git a/tests/test-storage.py b/tests/test-storage.py --- a/tests/test-storage.py +++ b/tests/test-storage.py @@ -17,6 +17,16 @@ storage as storagetesting, ) +from hgext import ( + sqlitestore, +) + +try: + from mercurial import zstd + zstd.__version__ +except ImportError: + zstd = None + STATE = { 'lastindex': 0, 'ui': uimod.ui(), @@ -70,5 +80,42 @@ maketransaction, addrawrevision) +def makesqlitefile(self): + path = STATE['vfs'].join(b'db-%d.db' % STATE['lastindex']) + STATE['lastindex'] += 1 + + db = sqlitestore.makedb(path) + + compression = b'zstd' if zstd else b'zlib' + + return sqlitestore.sqlitefilestore(db, b'dummy-path', compression) + +def addrawrevisionsqlite(self, fl, tr, node, p1, p2, linkrev, rawtext=None, + delta=None, censored=False, ellipsis=False, + extstored=False): + flags = 0 + + if censored: + flags |= sqlitestore.FLAG_CENSORED + + if ellipsis | extstored: + raise error.Abort(b'support for ellipsis and extstored flags not ' + b'supported') + + if rawtext is not None: + fl._addrawrevision(node, rawtext, tr, linkrev, p1, p2, flags=flags) + elif delta is not None: + fl._addrawrevision(node, rawtext, tr, linkrev, p1, p2, + storedelta=delta, flags=flags) + else: + raise error.Abort(b'must supply rawtext or delta arguments') + +sqlitefileindextests = storagetesting.makeifileindextests( + makesqlitefile, maketransaction, addrawrevisionsqlite) +sqlitefiledatatests = storagetesting.makeifiledatatests( + makesqlitefile, maketransaction, addrawrevisionsqlite) +sqlitefilemutationtests = storagetesting.makeifilemutationtests( + makesqlitefile, maketransaction, addrawrevisionsqlite) + if __name__ == '__main__': silenttestrunner.main(__name__)