Details
Details
- Reviewers
- None
- Group Reviewers
hg-reviewers
Diff Detail
Diff Detail
- Repository
- rHG Mercurial
- Branch
- default
- Lint
No Linters Available - Unit
No Unit Test Coverage
hg-reviewers |
No Linters Available |
No Unit Test Coverage |
Path | Packages | |||
---|---|---|---|---|
M | mercurial/bundlerepo.py (35 lines) | |||
M | mercurial/debugcommands.py (21 lines) | |||
M | mercurial/exchange.py (11 lines) | |||
A | M | mercurial/iblt.py (205 lines) | ||
M | mercurial/interfaces/repository.py (6 lines) | |||
M | mercurial/localrepo.py (54 lines) | |||
M | mercurial/setdiscovery.py (65 lines) | |||
M | mercurial/wireprotov1peer.py (9 lines) | |||
M | mercurial/wireprotov1server.py (10 lines) |
Commit | Parents | Author | Summary | Date |
---|---|---|---|---|
b2bec8304fbc | f45e1618cbf6 | Joerg Sonnenberger | Feb 18 2022, 10:03 PM |
"local" is a local repo from which to obtain the actual incoming | "local" is a local repo from which to obtain the actual incoming | ||||
changesets; it is a bundlerepo for the obtained bundle when the | changesets; it is a bundlerepo for the obtained bundle when the | ||||
original "peer" is remote. | original "peer" is remote. | ||||
"csets" lists the incoming changeset node ids. | "csets" lists the incoming changeset node ids. | ||||
"cleanupfn" must be called without arguments when you're done processing | "cleanupfn" must be called without arguments when you're done processing | ||||
the changes; it closes both the original "peer" and the one returned | the changes; it closes both the original "peer" and the one returned | ||||
here. | here. | ||||
""" | """ | ||||
success = False | |||||
if not onlyheads: | |||||
from . import setdiscovery | |||||
success, mychanges, theirchanges, common, rheads = setdiscovery.findsetdifferences(ui, repo, peer) | |||||
if success and not theirchanges: | |||||
try: | |||||
if bundlename: | |||||
os.unlink(bundlename) | |||||
except OSError: | |||||
pass | |||||
return repo, [], peer.close | |||||
if not success: | |||||
tmp = discovery.findcommonincoming(repo, peer, heads=onlyheads, force=force) | tmp = discovery.findcommonincoming(repo, peer, heads=onlyheads, force=force) | ||||
common, incoming, rheads = tmp | common, incoming, rheads = tmp | ||||
if not incoming: | if not incoming: | ||||
try: | try: | ||||
if bundlename: | if bundlename: | ||||
os.unlink(bundlename) | os.unlink(bundlename) | ||||
except OSError: | except OSError: | ||||
pass | pass | ||||
return repo, [], peer.close | return repo, [], peer.close | ||||
commonset = set(common) | commonset = set(common) | ||||
rheads = [x for x in rheads if x not in commonset] | rheads = [x for x in rheads if x not in commonset] | ||||
bundle = None | bundle = None | ||||
bundlerepo = None | bundlerepo = None | ||||
localrepo = peer.local() | localrepo = peer.local() | ||||
if bundlename or not localrepo: | if bundlename or not localrepo: | ||||
# create a bundle (uncompressed if peer repo is not local) | # create a bundle (uncompressed if peer repo is not local) | ||||
# developer config: devel.legacy.exchange | # developer config: devel.legacy.exchange |
hash_len = 20 # 160 bits for SHA-1 | hash_len = 20 # 160 bits for SHA-1 | ||||
hash_bytes = docket.tree_metadata[-hash_len:] | hash_bytes = docket.tree_metadata[-hash_len:] | ||||
ui.write(binascii.hexlify(hash_bytes) + b'\n') | ui.write(binascii.hexlify(hash_bytes) + b'\n') | ||||
@command( | @command( | ||||
b'debugdiscovery', | b'debugdiscovery', | ||||
[ | [ | ||||
(b'', b'old', None, _(b'use old-style discovery')), | (b'', b'protocol', b'set', _(b'use given discovery protocol')), | ||||
( | ( | ||||
b'', | b'', | ||||
b'nonheads', | b'nonheads', | ||||
None, | None, | ||||
_(b'use old-style discovery with non-heads included'), | _(b'use old-style discovery with non-heads included'), | ||||
), | ), | ||||
(b'', b'rev', [], b'restrict discovery to this set of revs'), | (b'', b'rev', [], b'restrict discovery to this set of revs'), | ||||
(b'', b'seed', b'12323', b'specify the random seed use for discovery'), | (b'', b'seed', b'12323', b'specify the random seed use for discovery'), | ||||
def local_func(x): | def local_func(x): | ||||
return local_filtered_revs | return local_filtered_revs | ||||
repoview.filtertable[b'debug-discovery-local-filter'] = local_func | repoview.filtertable[b'debug-discovery-local-filter'] = local_func | ||||
repo = repo.filtered(b'debug-discovery-local-filter') | repo = repo.filtered(b'debug-discovery-local-filter') | ||||
data = {} | data = {} | ||||
if opts.get(b'old'): | if opts[b'protocol'] == b'tree': | ||||
def doit(pushedrevs, remoteheads, remote=remote): | def doit(pushedrevs, remoteheads, remote=remote): | ||||
if not util.safehasattr(remote, b'branches'): | if not util.safehasattr(remote, b'branches'): | ||||
# enable in-client legacy support | # enable in-client legacy support | ||||
remote = localrepo.locallegacypeer(remote.local()) | remote = localrepo.locallegacypeer(remote.local()) | ||||
common, _in, hds = treediscovery.findcommonincoming( | common, _in, hds = treediscovery.findcommonincoming( | ||||
repo, remote, force=True, audit=data | repo, remote, force=True, audit=data | ||||
) | ) | ||||
common = set(common) | common = set(common) | ||||
if not opts.get(b'nonheads'): | if not opts.get(b'nonheads'): | ||||
ui.writenoi18n( | ui.writenoi18n( | ||||
b"unpruned common: %s\n" | b"unpruned common: %s\n" | ||||
% b" ".join(sorted(short(n) for n in common)) | % b" ".join(sorted(short(n) for n in common)) | ||||
) | ) | ||||
clnode = repo.changelog.node | clnode = repo.changelog.node | ||||
common = repo.revs(b'heads(::%ln)', common) | common = repo.revs(b'heads(::%ln)', common) | ||||
common = {clnode(r) for r in common} | common = {clnode(r) for r in common} | ||||
return common, hds | return common, hds | ||||
else: | elif opts[b'protocol'] == b'set': | ||||
def doit(pushedrevs, remoteheads, remote=remote): | def doit(pushedrevs, remoteheads, remote=remote): | ||||
nodes = None | nodes = None | ||||
if pushedrevs: | if pushedrevs: | ||||
revs = logcmdutil.revrange(repo, pushedrevs) | revs = logcmdutil.revrange(repo, pushedrevs) | ||||
nodes = [repo[r].node() for r in revs] | nodes = [repo[r].node() for r in revs] | ||||
common, any, hds = setdiscovery.findcommonheads( | common, any, hds = setdiscovery.findcommonheads( | ||||
ui, repo, remote, ancestorsof=nodes, audit=data | ui, repo, remote, ancestorsof=nodes, audit=data | ||||
) | ) | ||||
return common, hds | return common, hds | ||||
elif opts[b'protocol'] == b'iblt': | |||||
def doit(pushedrevs, remoteheads, remote=remote): | |||||
success, _, _, common, rheads = setdiscovery.findsetdifferences(ui, repo, remote) | |||||
assert success | |||||
return common, rheads | |||||
else: | |||||
raise error.InputError( | |||||
_(b"Unknown or unsupported discovery protocol") | |||||
) | |||||
remoterevs, _checkout = hg.addbranchrevs(repo, remote, branches, revs=None) | remoterevs, _checkout = hg.addbranchrevs(repo, remote, branches, revs=None) | ||||
localrevs = opts[b'rev'] | localrevs = opts[b'rev'] | ||||
fm = ui.formatter(b'debugdiscovery', opts) | fm = ui.formatter(b'debugdiscovery', opts) | ||||
if fm.strict_format: | if fm.strict_format: | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def may_capture_output(): | def may_capture_output(): | ||||
data[b'nb-ini_und-roots'] = len(roots_initial_undecided) | data[b'nb-ini_und-roots'] = len(roots_initial_undecided) | ||||
data[b'nb-ini_und-common'] = len(common_initial_undecided) | data[b'nb-ini_und-common'] = len(common_initial_undecided) | ||||
data[b'nb-ini_und-missing'] = len(missing_initial_undecided) | data[b'nb-ini_und-missing'] = len(missing_initial_undecided) | ||||
fm.startitem() | fm.startitem() | ||||
fm.data(**pycompat.strkwargs(data)) | fm.data(**pycompat.strkwargs(data)) | ||||
# display discovery summary | # display discovery summary | ||||
fm.plain(b"elapsed time: %(elapsed)f seconds\n" % data) | fm.plain(b"elapsed time: %(elapsed)f seconds\n" % data) | ||||
fm.end() | |||||
return | |||||
fm.plain(b"round-trips: %(total-roundtrips)9d\n" % data) | fm.plain(b"round-trips: %(total-roundtrips)9d\n" % data) | ||||
fm.plain(b"queries: %(total-queries)9d\n" % data) | fm.plain(b"queries: %(total-queries)9d\n" % data) | ||||
fm.plain(b"heads summary:\n") | fm.plain(b"heads summary:\n") | ||||
fm.plain(b" total common heads: %(nb-common-heads)9d\n" % data) | fm.plain(b" total common heads: %(nb-common-heads)9d\n" % data) | ||||
fm.plain(b" also local heads: %(nb-common-heads-local)9d\n" % data) | fm.plain(b" also local heads: %(nb-common-heads-local)9d\n" % data) | ||||
fm.plain(b" also remote heads: %(nb-common-heads-remote)9d\n" % data) | fm.plain(b" also remote heads: %(nb-common-heads-remote)9d\n" % data) | ||||
fm.plain(b" both: %(nb-common-heads-both)9d\n" % data) | fm.plain(b" both: %(nb-common-heads-both)9d\n" % data) | ||||
fm.plain(b" local heads: %(nb-head-local)9d\n" % data) | fm.plain(b" local heads: %(nb-head-local)9d\n" % data) |
@pulldiscovery(b'changegroup') | @pulldiscovery(b'changegroup') | ||||
def _pulldiscoverychangegroup(pullop): | def _pulldiscoverychangegroup(pullop): | ||||
"""discovery phase for the pull | """discovery phase for the pull | ||||
Current handle changeset discovery only, will change handle all discovery | Current handle changeset discovery only, will change handle all discovery | ||||
at some point.""" | at some point.""" | ||||
if not pullop.heads: | |||||
from . import setdiscovery | |||||
success, mychanges, theirchanges, common, rheads = setdiscovery.findsetdifferences(pullop.repo.ui, pullop.repo, pullop.remote) | |||||
if success: | |||||
tonode = pullop.repo.unfiltered().changelog.node | |||||
pullop.common = common | |||||
pullop.rheads = rheads | |||||
pullop.fetch = bool(theirchanges) | |||||
return | |||||
tmp = discovery.findcommonincoming( | tmp = discovery.findcommonincoming( | ||||
pullop.repo, pullop.remote, heads=pullop.heads, force=pullop.force | pullop.repo, pullop.remote, heads=pullop.heads, force=pullop.force | ||||
) | ) | ||||
common, fetch, rheads = tmp | common, fetch, rheads = tmp | ||||
has_node = pullop.repo.unfiltered().changelog.index.has_node | has_node = pullop.repo.unfiltered().changelog.index.has_node | ||||
if fetch and rheads: | if fetch and rheads: | ||||
# If a remote heads is filtered locally, put in back in common. | # If a remote heads is filtered locally, put in back in common. | ||||
# | # |
import copy | |||||
import hashlib | |||||
import struct | |||||
def range_size(upper): | |||||
if upper < 256: | |||||
return 1 | |||||
if upper < 65536: | |||||
return 2 | |||||
if upper < 16777216: | |||||
return 3 | |||||
return 4 | |||||
class iblt: | |||||
def __init__(self, m, k, key_size): | |||||
self.m = m | |||||
self.k = k | |||||
self.key_size = key_size | |||||
self.key_xors = [0] * m | |||||
self.key_hash_xors = [0] * m | |||||
self.key_hash_size = 4 | |||||
self.counts = [0] * m | |||||
def insert(self, key): | |||||
self._change(key, 1) | |||||
def remove(self, key): | |||||
self._change(key, -1) | |||||
def __hash(self, key): | |||||
hashes = hashlib.blake2b(key, digest_size=4*self.k + self.key_hash_size).digest() | |||||
values = [int.from_bytes(hashes[4*i:4*i+4], 'big') % self.m for i in range(self.k)] | |||||
# Fudge indices if they are not unique. This avoids the most common | |||||
# reason for the (implicit) peeling process to fail. | |||||
if values[0] == values[1]: | |||||
values[1] ^= 1 | |||||
if values[0] == values[2] or values[1] == values[2]: | |||||
values[2] ^= 1 | |||||
if values[0] == values[2] or values[1] == values[2]: | |||||
values[2] ^= 2 | |||||
return values, int.from_bytes(hashes[4 * self.k:], 'big') | |||||
def _change(self, key, count): | |||||
indices, keyhash = self.__hash(key) | |||||
numkey = int.from_bytes(key, 'big') | |||||
for i in indices: | |||||
self.key_xors[i] ^= numkey | |||||
self.key_hash_xors[i] ^= keyhash | |||||
self.counts[i] += count | |||||
def list(self): | |||||
left = [] | |||||
right = [] | |||||
queue = [] | |||||
for i in range(self.m): | |||||
if self.counts[i] in (1, -1): | |||||
queue.append(i) | |||||
while queue: | |||||
i = queue.pop() | |||||
c = self.counts[i] | |||||
if c not in (1, -1): | |||||
continue | |||||
intkey = self.key_xors[i] | |||||
key = intkey.to_bytes(length = self.key_size, byteorder='big') | |||||
indices, keyhash = self.__hash(key) | |||||
if self.key_hash_xors[i] != keyhash: | |||||
continue | |||||
for k in indices: | |||||
self.key_xors[k] ^= intkey | |||||
self.key_hash_xors[k] ^= keyhash | |||||
self.counts[k] -= c | |||||
if self.counts[k] in (1, -1): | |||||
queue.append(k) | |||||
if c == 1: | |||||
left.append(key) | |||||
else: | |||||
right.append(key) | |||||
for i in range(self.m): | |||||
if self.key_xors[i] or self.key_hash_xors[i] or self.counts[i]: | |||||
return False, left, right | |||||
return True, left, right | |||||
def subtract(self, other): | |||||
assert self.m == other.m | |||||
assert self.k == other.k | |||||
assert self.key_size == other.key_size | |||||
for i in range(self.m): | |||||
self.key_xors[i] ^= other.key_xors[i] | |||||
self.key_hash_xors[i] ^= other.key_hash_xors[i] | |||||
self.counts[i] -= other.counts[i] | |||||
def dump(self): | |||||
min_count = min(self.counts) | |||||
max_count = max(self.counts) | |||||
count_size = range_size(max_count - min_count) | |||||
data = [] | |||||
data.extend(self.m.to_bytes(4, 'big')) | |||||
data.extend(self.k.to_bytes(1, 'big')) | |||||
data.extend(self.key_size.to_bytes(1, 'big')) | |||||
data.extend(self.key_hash_size.to_bytes(1, 'big')) | |||||
data.extend(count_size.to_bytes(1, 'big')) | |||||
data.extend(min_count.to_bytes(4, 'big', signed = True)) | |||||
for i in range(self.m): | |||||
data.extend((self.counts[i] - min_count).to_bytes(count_size, 'big')) | |||||
data.extend(self.key_hash_xors[i].to_bytes(self.key_hash_size, 'big')) | |||||
data.extend(self.key_xors[i].to_bytes(self.key_size, 'big')) | |||||
return bytes(data) | |||||
@classmethod | |||||
def load(cls, data): | |||||
self = cls.__new__(cls) | |||||
self.m = int.from_bytes(data[:4], 'big') | |||||
self.k = int.from_bytes(data[4:5], 'big') | |||||
self.key_size = int.from_bytes(data[5:6], 'big') | |||||
self.key_hash_size = int.from_bytes(data[6:7], 'big') | |||||
count_size = int.from_bytes(data[7:8], 'big') | |||||
min_count = int.from_bytes(data[8:12], 'big', signed = True) | |||||
pos = 12 | |||||
self.counts = [] | |||||
self.key_hash_xors = [] | |||||
self.key_xors = [] | |||||
for i in range(self.m): | |||||
self.counts.append(min_count + int.from_bytes(data[pos:pos+count_size], 'big')) | |||||
pos += count_size | |||||
self.key_hash_xors.append(int.from_bytes(data[pos:pos+self.key_hash_size], 'big')) | |||||
pos += self.key_hash_size | |||||
self.key_xors.append(int.from_bytes(data[pos:pos+self.key_size], 'big')) | |||||
pos += self.key_size | |||||
return self, pos | |||||
def compatible(self, other): | |||||
return self.m == other.m and self.k == other.k and self.key_size == other.key_size and self.key_hash_size == other.key_hash_size | |||||
def __eq__(self, other): | |||||
if not self.compatible(other): | |||||
return False | |||||
return self.counts == other.counts and self.key_xors == other.key_xors and self.key_hash_xors == other.key_hash_xors | |||||
def ffs(x): | |||||
return (x&-x).bit_length() - 1 | |||||
class estimator: | |||||
def __init__(self, stratas = 32): | |||||
self.stratas = stratas | |||||
self.key_size = self.stratas // 8 | |||||
assert self.stratas <= 256 | |||||
self.strata_size = 120 | |||||
self.k = 3 | |||||
self.iblts = [iblt(self.strata_size, self.k, self.key_size) for n in range(self.stratas)] | |||||
def insert(self, key): | |||||
self._change(key, 1) | |||||
def remove(self, key): | |||||
self._change(key, -1) | |||||
def _change(self, key, count): | |||||
h = self.__hash(key) | |||||
lowest = ffs(h) if h else self.stratas - 1 | |||||
self.iblts[lowest]._change(bytes(h.to_bytes(self.key_size, 'big')), count) | |||||
def __hash(self, key): | |||||
return int.from_bytes(hashlib.blake2b(key, digest_size=self.key_size).digest(), 'big') | |||||
def dump(self): | |||||
data = [] | |||||
data.extend(self.stratas.to_bytes(2, 'big')) | |||||
data.extend(self.strata_size.to_bytes(2, 'big')) | |||||
for i in range(self.stratas): | |||||
data.extend(self.iblts[i].dump()) | |||||
return bytes(data) | |||||
@classmethod | |||||
def load(cls, data): | |||||
self = cls.__new__(cls) | |||||
self.stratas = int.from_bytes(data[:2], 'big') | |||||
self.strata_size = int.from_bytes(data[2:4], 'big') | |||||
self.key_size = self.stratas // 8 | |||||
assert self.stratas <= 256 | |||||
self.k = 3 | |||||
data = data[4:] | |||||
self.iblts = [] | |||||
for i in range(self.stratas): | |||||
inst, pos = iblt.load(data) | |||||
self.iblts.append(inst) | |||||
data = data[pos:] | |||||
return self | |||||
def compare(self, other): | |||||
assert self.stratas == other.stratas and self.strata_size == other.strata_size and self.k == other.k | |||||
estimate = 0 | |||||
for i in reversed(range(self.stratas)): | |||||
iblt = copy.deepcopy(self.iblts[i]) | |||||
#iblt = self.iblts[i] | |||||
iblt.subtract(other.iblts[i]) | |||||
success, ours, theirs = iblt.list() | |||||
if success: | |||||
estimate += len(ours) + len(theirs) | |||||
else: | |||||
estimate <<= i + 1 | |||||
break | |||||
return estimate |
"""Obtain remote repository data as a bundle. | """Obtain remote repository data as a bundle. | ||||
This command is how the bulk of repository data is transferred from | This command is how the bulk of repository data is transferred from | ||||
the peer to the local repository | the peer to the local repository | ||||
Returns a generator of bundle data. | Returns a generator of bundle data. | ||||
""" | """ | ||||
def getestimator(name): | |||||
pass | |||||
def getiblt(name, size, seed): | |||||
pass | |||||
def heads(): | def heads(): | ||||
"""Determine all known head revisions in the peer. | """Determine all known head revisions in the peer. | ||||
Returns an iterable of binary nodes. | Returns an iterable of binary nodes. | ||||
""" | """ | ||||
def known(nodes): | def known(nodes): | ||||
"""Determine whether multiple nodes are known. | """Determine whether multiple nodes are known. |
dirstateguard, | dirstateguard, | ||||
discovery, | discovery, | ||||
encoding, | encoding, | ||||
error, | error, | ||||
exchange, | exchange, | ||||
extensions, | extensions, | ||||
filelog, | filelog, | ||||
hook, | hook, | ||||
iblt, | |||||
lock as lockmod, | lock as lockmod, | ||||
match as matchmod, | match as matchmod, | ||||
mergestate as mergestatemod, | mergestate as mergestatemod, | ||||
mergeutil, | mergeutil, | ||||
namespaces, | namespaces, | ||||
narrowspec, | narrowspec, | ||||
obsolete, | obsolete, | ||||
pathutil, | pathutil, | ||||
moderncaps = { | moderncaps = { | ||||
b'lookup', | b'lookup', | ||||
b'branchmap', | b'branchmap', | ||||
b'pushkey', | b'pushkey', | ||||
b'known', | b'known', | ||||
b'getbundle', | b'getbundle', | ||||
b'unbundle', | b'unbundle', | ||||
b'iblt-changelog', | |||||
} | } | ||||
legacycaps = moderncaps.union({b'changegroupsubset'}) | legacycaps = moderncaps.union({b'changegroupsubset'}) | ||||
@interfaceutil.implementer(repository.ipeercommandexecutor) | @interfaceutil.implementer(repository.ipeercommandexecutor) | ||||
class localcommandexecutor: | class localcommandexecutor: | ||||
def __init__(self, peer): | def __init__(self, peer): | ||||
self._peer = peer | self._peer = peer | ||||
return self._repo.branchmap() | return self._repo.branchmap() | ||||
def capabilities(self): | def capabilities(self): | ||||
return self._caps | return self._caps | ||||
def clonebundles(self): | def clonebundles(self): | ||||
return self._repo.tryread(bundlecaches.CB_MANIFEST_FILE) | return self._repo.tryread(bundlecaches.CB_MANIFEST_FILE) | ||||
def getestimator(self, name): | |||||
if name == b'changelog': | |||||
repo = self.local() | |||||
cachename = b'estimator-changelog.%d' % repo.changelog.tiprev() | |||||
try: | |||||
data = repo.cachevfs.read(cachename) | |||||
except (IOError, OSError): | |||||
data = None | |||||
if data: | |||||
return iblt.estimator.load(data) | |||||
estimator = iblt.estimator(32) | |||||
tonode = repo.unfiltered().changelog.node | |||||
for rev in repo.revs('all()'): | |||||
estimator.insert(tonode(rev)) | |||||
try: | |||||
with repo.cachevfs.open(cachename, b'wb') as f: | |||||
f.write(estimator.dump()) | |||||
except (IOError, OSError) as inst: | |||||
pass | |||||
else: | |||||
raise KeyError(b'unknown getestimator key %s' % name) | |||||
return estimator | |||||
def getiblt(self, name, size, seed): | |||||
if seed != 0: | |||||
raise KeyError(b'unsupport getiblt seed: %s' % seed) | |||||
if name == b'changelog': | |||||
repo = self.local() | |||||
cachename = b'iblt-changelog.%d-%d' % (repo.changelog.tiprev(), size) | |||||
try: | |||||
data = repo.cachevfs.read(cachename) | |||||
except (IOError, OSError): | |||||
data = None | |||||
if data: | |||||
return iblt.iblt.load(data)[0] | |||||
tonode = repo.unfiltered().changelog.node | |||||
parents = repo.unfiltered().changelog.parents | |||||
inst = iblt.iblt(size, 3, 3 * len(repo.nullid)) | |||||
for rev in repo.revs('all()'): | |||||
node = tonode(rev) | |||||
inst.insert(node + b''.join(parents(node))) | |||||
try: | |||||
with repo.cachevfs.open(cachename, b'wb') as f: | |||||
f.write(inst.dump()) | |||||
except (IOError, OSError) as inst: | |||||
pass | |||||
else: | |||||
raise KeyError(b'unknown getiblt key %s' % name) | |||||
return inst | |||||
def debugwireargs(self, one, two, three=None, four=None, five=None): | def debugwireargs(self, one, two, three=None, four=None, five=None): | ||||
"""Used to test argument passing over the wire""" | """Used to test argument passing over the wire""" | ||||
return b"%s %s %s %s %s" % ( | return b"%s %s %s %s %s" % ( | ||||
one, | one, | ||||
two, | two, | ||||
pycompat.bytestr(three), | pycompat.bytestr(three), | ||||
pycompat.bytestr(four), | pycompat.bytestr(four), | ||||
pycompat.bytestr(five), | pycompat.bytestr(five), |
pure_partialdiscovery = partialdiscovery | pure_partialdiscovery = partialdiscovery | ||||
partialdiscovery = policy.importrust( | partialdiscovery = policy.importrust( | ||||
'discovery', member='PartialDiscovery', default=partialdiscovery | 'discovery', member='PartialDiscovery', default=partialdiscovery | ||||
) | ) | ||||
import math | |||||
iblt_sizes = [(1 << i) for i in range(5, 31)] | |||||
iblt_sizes += [math.trunc(math.sqrt(2) * s) for s in iblt_sizes] | |||||
iblt_sizes.sort() | |||||
def round_iblt_size(size): | |||||
size = size + size // 4 | |||||
for s in iblt_sizes: | |||||
if s >= size: | |||||
return s | |||||
def findsetdifferences(ui, local, remote): | |||||
if not remote.capable(b'iblt-changelog'): | |||||
ui.status(b'no iblt support: %s\n' % b' '.join(list(remote.capabilities()))) | |||||
return False, [], [], [], [] | |||||
myestimator = local.peer().getestimator(b'changelog') | |||||
theirestimator = remote.getestimator(b'changelog') | |||||
estimated_diff = myestimator.compare(theirestimator) | |||||
# bail out if estimated_diff = O(len(repo)) and fallback to the classic mechanism? | |||||
iblt_size = round_iblt_size(estimated_diff) | |||||
ui.debug(b"expected difference is: %d, using IBLT size of %d\n" % (estimated_diff, iblt_size)) | |||||
attempt = 0 | |||||
while True: | |||||
myiblt = local.peer().getiblt(b'changelog', iblt_size, 0) | |||||
theiriblt = remote.getiblt(b'changelog', iblt_size, 0) | |||||
theiriblt.subtract(myiblt) | |||||
success, them_only, my_only = theiriblt.list() | |||||
if not success: | |||||
attempt += 1 | |||||
if attempt == 3: | |||||
ui.debug(b'iblt extraction failed\n') | |||||
return False, [], [], [], [] | |||||
iblt_size = round_iblt_size(iblt_size + 1) | |||||
ui.debug(b'iblt extraction failed, retrying with size %d' % iblt_size) | |||||
continue | |||||
ui.status(b'iblt extraction worked, %d local changes and %d remote changes found\n' % (len(my_only), len(them_only))) | |||||
break | |||||
has_node = local.changelog.index.has_node | |||||
nodelen = len(local.nullid) | |||||
my_only = [node[:nodelen] for node in my_only] | |||||
# first: find all parents and nodes | |||||
parents = set() | |||||
nodes = set() | |||||
for row in them_only: | |||||
node = row[:nodelen] | |||||
if has_node(node): | |||||
raise error.Abort(_(b"found already known remote change: %s") % node) | |||||
nodes.add(node) | |||||
parents.add(row[nodelen:2*nodelen]) | |||||
parents.add(row[2*nodelen:]) | |||||
# second: remote heads are all nodes that are not also parents | |||||
remoteheads = nodes - parents | |||||
# third: parent nodes that are not nodes themselve are the boundary | |||||
# of the common set. Double check that they are known locally. | |||||
commonheadscandidates = parents - nodes | |||||
commonheads = [node for node in commonheadscandidates if has_node(node)] | |||||
if len(commonheads) != len(commonheadscandidates): | |||||
raise error.Abort(_(b"found remote changes with unknown parents")) | |||||
return True, my_only, them_only, commonheads, remoteheads | |||||
def findcommonheads( | def findcommonheads( | ||||
ui, | ui, | ||||
local, | local, | ||||
remote, | remote, | ||||
abortwhenunrelated=True, | abortwhenunrelated=True, | ||||
ancestorsof=None, | ancestorsof=None, | ||||
audit=None, | audit=None, | ||||
): | ): | ||||
"""Return a tuple (common, anyincoming, remoteheads) used to identify | """Return a tuple (common, anyincoming, remoteheads) used to identify | ||||
missing nodes from or in remote. | missing nodes from or in remote. | ||||
The audit argument is an optional dictionnary that a caller can pass. it | The audit argument is an optional dictionnary that a caller can pass. it | ||||
will be updated with extra data about the discovery, this is useful for | will be updated with extra data about the discovery, this is useful for | ||||
debug. | debug. | ||||
""" | """ | ||||
samplegrowth = float(ui.config(b'devel', b'discovery.grow-sample.rate')) | samplegrowth = float(ui.config(b'devel', b'discovery.grow-sample.rate')) | ||||
if audit is not None: | if audit is not None: | ||||
audit[b'total-queries'] = 0 | audit[b'total-queries'] = 0 | ||||
start = util.timer() | start = util.timer() | ||||
roundtrips = 0 | roundtrips = 0 |
getattr, | getattr, | ||||
setattr, | setattr, | ||||
) | ) | ||||
from . import ( | from . import ( | ||||
bundle2, | bundle2, | ||||
changegroup as changegroupmod, | changegroup as changegroupmod, | ||||
encoding, | encoding, | ||||
error, | error, | ||||
iblt, | |||||
pushkey as pushkeymod, | pushkey as pushkeymod, | ||||
pycompat, | pycompat, | ||||
util, | util, | ||||
wireprototypes, | wireprototypes, | ||||
) | ) | ||||
from .interfaces import ( | from .interfaces import ( | ||||
repository, | repository, | ||||
util as interfaceutil, | util as interfaceutil, | ||||
for l in output.splitlines(True): | for l in output.splitlines(True): | ||||
self.ui.status(_(b'remote: '), l) | self.ui.status(_(b'remote: '), l) | ||||
else: | else: | ||||
# bundle2 push. Send a stream, fetch a stream. | # bundle2 push. Send a stream, fetch a stream. | ||||
stream = self._calltwowaystream(b'unbundle', bundle, heads=heads) | stream = self._calltwowaystream(b'unbundle', bundle, heads=heads) | ||||
ret = bundle2.getunbundler(self.ui, stream) | ret = bundle2.getunbundler(self.ui, stream) | ||||
return ret | return ret | ||||
def getestimator(self, name): | |||||
d = self._call(b"getestimator", name=name) | |||||
return iblt.estimator.load(d) | |||||
def getiblt(self, name, size, seed): | |||||
d = self._call(b"getiblt", name=name, size=b'%d' % size, seed=b'%d' % seed) | |||||
return iblt.iblt.load(d)[0] | |||||
# End of ipeercommands interface. | # End of ipeercommands interface. | ||||
# Begin of ipeerlegacycommands interface. | # Begin of ipeerlegacycommands interface. | ||||
def branches(self, nodes): | def branches(self, nodes): | ||||
n = wireprototypes.encodelist(nodes) | n = wireprototypes.encodelist(nodes) | ||||
d = self._call(b"branches", nodes=n) | d = self._call(b"branches", nodes=n) | ||||
try: | try: |
wireprotocaps = [ | wireprotocaps = [ | ||||
b'lookup', | b'lookup', | ||||
b'branchmap', | b'branchmap', | ||||
b'pushkey', | b'pushkey', | ||||
b'known', | b'known', | ||||
b'getbundle', | b'getbundle', | ||||
b'unbundlehash', | b'unbundlehash', | ||||
b'iblt-changelog', | |||||
] | ] | ||||
def _capabilities(repo, proto): | def _capabilities(repo, proto): | ||||
"""return a list of capabilities for a repo | """return a list of capabilities for a repo | ||||
This function exists to allow extensions to easily wrap capabilities | This function exists to allow extensions to easily wrap capabilities | ||||
computation | computation | ||||
repo.ui.debug(b'sending pullbundle "%s"\n' % path) | repo.ui.debug(b'sending pullbundle "%s"\n' % path) | ||||
try: | try: | ||||
return repo.vfs.open(path) | return repo.vfs.open(path) | ||||
except IOError: | except IOError: | ||||
repo.ui.debug(b'pullbundle "%s" not accessible\n' % path) | repo.ui.debug(b'pullbundle "%s" not accessible\n' % path) | ||||
continue | continue | ||||
return None | return None | ||||
@wireprotocommand(b'getestimator', b'name', permission=b'pull') | |||||
def getestimator(repo, proto, name): | |||||
estimator = repo.peer().getestimator(name) | |||||
return wireprototypes.bytesresponse(estimator.dump()) | |||||
@wireprotocommand(b'getiblt', b'name size seed', permission=b'pull') | |||||
def getiblt(repo, proto, name, size, seed): | |||||
inst = repo.peer().getiblt(name, int(size), int(seed)) | |||||
return wireprototypes.bytesresponse(inst.dump()) | |||||
@wireprotocommand(b'getbundle', b'*', permission=b'pull') | @wireprotocommand(b'getbundle', b'*', permission=b'pull') | ||||
def getbundle(repo, proto, others): | def getbundle(repo, proto, others): | ||||
opts = options( | opts = options( | ||||
b'getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(), others | b'getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(), others | ||||
) | ) | ||||
for k, v in opts.items(): | for k, v in opts.items(): | ||||
keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k] | keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k] |