diff --git a/hgext/largefiles/overrides.py b/hgext/largefiles/overrides.py --- a/hgext/largefiles/overrides.py +++ b/hgext/largefiles/overrides.py @@ -515,7 +515,7 @@ return actions, diverge, renamedelete @eh.wrapfunction(merge, 'recordupdates') -def mergerecordupdates(orig, repo, actions, branchmerge): +def mergerecordupdates(orig, repo, actions, branchmerge, getfiledata): if 'lfmr' in actions: lfdirstate = lfutil.openlfdirstate(repo.ui, repo) for lfile, args, msg in actions['lfmr']: @@ -526,7 +526,7 @@ lfdirstate.add(lfile) lfdirstate.write() - return orig(repo, actions, branchmerge) + return orig(repo, actions, branchmerge, getfiledata) # Override filemerge to prompt the user about how they wish to merge # largefiles. This will handle identical edits without prompting the user. diff --git a/hgext/narrow/narrowdirstate.py b/hgext/narrow/narrowdirstate.py --- a/hgext/narrow/narrowdirstate.py +++ b/hgext/narrow/narrowdirstate.py @@ -16,21 +16,21 @@ """Add narrow spec dirstate ignore, block changes outside narrow spec.""" def _editfunc(fn): - def _wrapper(self, *args): + def _wrapper(self, *args, **kwargs): narrowmatch = repo.narrowmatch() for f in args: if f is not None and not narrowmatch(f) and f not in self: raise error.Abort(_("cannot track '%s' - it is outside " + "the narrow clone") % f) - return fn(self, *args) + return fn(self, *args, **kwargs) return _wrapper class narrowdirstate(dirstate.__class__): # Prevent adding/editing/copying/deleting files that are outside the # sparse checkout @_editfunc - def normal(self, *args): - return super(narrowdirstate, self).normal(*args) + def normal(self, *args, **kwargs): + return super(narrowdirstate, self).normal(*args, **kwargs) @_editfunc def add(self, *args): diff --git a/hgext/remotefilelog/__init__.py b/hgext/remotefilelog/__init__.py --- a/hgext/remotefilelog/__init__.py +++ b/hgext/remotefilelog/__init__.py @@ -442,7 +442,8 @@ return s # prefetch files before update -def applyupdates(orig, repo, actions, wctx, mctx, overwrite, labels=None): +def applyupdates(orig, repo, actions, wctx, mctx, overwrite, wantfiledata, + labels=None): if isenabled(repo): manifest = mctx.manifest() files = [] @@ -450,7 +451,8 @@ files.append((f, hex(manifest[f]))) # batch fetch the needed files from the server repo.fileservice.prefetch(files) - return orig(repo, actions, wctx, mctx, overwrite, labels=labels) + return orig(repo, actions, wctx, mctx, overwrite, wantfiledata, + labels=labels) # Prefetch merge checkunknownfiles def checkunknownfiles(orig, repo, wctx, mctx, force, actions, diff --git a/hgext/sparse.py b/hgext/sparse.py --- a/hgext/sparse.py +++ b/hgext/sparse.py @@ -228,7 +228,7 @@ hint = _('include file with `hg debugsparse --include ` or use ' + '`hg add -s ` to include file directory while adding') for func in editfuncs: - def _wrapper(orig, self, *args): + def _wrapper(orig, self, *args, **kwargs): sparsematch = self._sparsematcher if not sparsematch.always(): for f in args: @@ -237,7 +237,7 @@ raise error.Abort(_("cannot add '%s' - it is outside " "the sparse checkout") % f, hint=hint) - return orig(self, *args) + return orig(self, *args, **kwargs) extensions.wrapfunction(dirstate.dirstate, func, _wrapper) @command('debugsparse', [ diff --git a/mercurial/context.py b/mercurial/context.py --- a/mercurial/context.py +++ b/mercurial/context.py @@ -1766,6 +1766,8 @@ def size(self): return self._repo.wvfs.lstat(self._path).st_size + def lstat(self): + return self._repo.wvfs.lstat(self._path) def date(self): t, tz = self._changectx.date() try: @@ -1801,9 +1803,9 @@ def write(self, data, flags, backgroundclose=False, **kwargs): """wraps repo.wwrite""" - self._repo.wwrite(self._path, data, flags, - backgroundclose=backgroundclose, - **kwargs) + return self._repo.wwrite(self._path, data, flags, + backgroundclose=backgroundclose, + **kwargs) def markcopied(self, src): """marks this file a copy of `src`""" diff --git a/mercurial/dirstate.py b/mercurial/dirstate.py --- a/mercurial/dirstate.py +++ b/mercurial/dirstate.py @@ -396,12 +396,24 @@ self._updatedfiles.add(f) self._map.addfile(f, oldstate, state, mode, size, mtime) - def normal(self, f): - '''Mark a file normal and clean.''' - s = os.lstat(self._join(f)) - mtime = s[stat.ST_MTIME] - self._addpath(f, 'n', s.st_mode, - s.st_size & _rangemask, mtime & _rangemask) + def normal(self, f, parentfiledata=None): + '''Mark a file normal and clean. + + parentfiledata: (mode, size, mtime) of the clean file + + parentfiledata should be computed from memory (for mode, + size), as or close as possible from the point where we + determined the file was clean, to limit the risk of the + file having been changed by an external process between the + moment where the file was determined to be clean and now.''' + if parentfiledata: + (mode, size, mtime) = parentfiledata + else: + s = os.lstat(self._join(f)) + mode = s.st_mode + size = s.st_size + mtime = s[stat.ST_MTIME] + self._addpath(f, 'n', mode, size & _rangemask, mtime & _rangemask) self._map.copymap.pop(f, None) if f in self._map.nonnormalset: self._map.nonnormalset.remove(f) diff --git a/mercurial/merge.py b/mercurial/merge.py --- a/mercurial/merge.py +++ b/mercurial/merge.py @@ -10,6 +10,7 @@ import errno import hashlib import shutil +import stat import struct from .i18n import _ @@ -683,7 +684,7 @@ def recordactions(self): """record remove/add/get actions in the dirstate""" branchmerge = self._repo.dirstate.p2() != nullid - recordupdates(self._repo, self.actions(), branchmerge) + recordupdates(self._repo, self.actions(), branchmerge, None) def queueremove(self, f): """queues a file to be removed from the dirstate @@ -1464,13 +1465,17 @@ repo.ui.warn(_("current directory was removed\n" "(consider changing to repo root: %s)\n") % repo.root) -def batchget(repo, mctx, wctx, actions): +def batchget(repo, mctx, wctx, wantfiledata, actions): """apply gets to the working directory mctx is the context to get from - yields tuples for progress updates + Yields arbitrarily many (False, tuple) for progress updates, followed by + exactly one (True, filedata). When wantfiledata is false, filedata is an + empty list. When wantfiledata is true, filedata[i] is a triple (mode, size, + mtime) of the file written for action[i]. """ + filedata = [] verbose = repo.ui.verbose fctx = mctx.filectx ui = repo.ui @@ -1494,16 +1499,24 @@ if repo.wvfs.lexists(conflicting): orig = scmutil.backuppath(ui, repo, conflicting) util.rename(repo.wjoin(conflicting), orig) - wctx[f].clearunknown() + wfctx = wctx[f] + wfctx.clearunknown() atomictemp = ui.configbool("experimental", "update.atomic-file") - wctx[f].write(fctx(f).data(), flags, backgroundclose=True, - atomictemp=atomictemp) + size = wfctx.write(fctx(f).data(), flags, + backgroundclose=True, + atomictemp=atomictemp) + if wantfiledata: + s = wfctx.lstat() + mode = s.st_mode + mtime = s[stat.ST_MTIME] + filedata.append((mode, size, mtime)) # for dirstate.normal if i == 100: - yield i, f + yield False, (i, f) i = 0 i += 1 if i > 0: - yield i, f + yield False, (i, f) + yield True, filedata def _prefetchfiles(repo, ctx, actions): """Invoke ``scmutil.prefetchfiles()`` for the files relevant to the dict @@ -1550,14 +1563,17 @@ ACTION_PATH_CONFLICT, ACTION_PATH_CONFLICT_RESOLVE)) -def applyupdates(repo, actions, wctx, mctx, overwrite, labels=None): +def applyupdates(repo, actions, wctx, mctx, overwrite, wantfiledata, + labels=None): """apply the merge action list to the working directory wctx is the working copy context mctx is the context to be merged into the working copy - Return a tuple of counts (updated, merged, removed, unresolved) that - describes how many files were affected by the update. + Return a tuple of (counts, filedata), where counts is a tuple + (updated, merged, removed, unresolved) that describes how many + files were affected by the update, and filedata is as described in + batchget. """ _prefetchfiles(repo, mctx, actions) @@ -1649,11 +1665,18 @@ # get in parallel. threadsafe = repo.ui.configbool('experimental', 'worker.wdir-get-thread-safe') - prog = worker.worker(repo.ui, cost, batchget, (repo, mctx, wctx), + prog = worker.worker(repo.ui, cost, batchget, + (repo, mctx, wctx, wantfiledata), actions[ACTION_GET], - threadsafe=threadsafe) - for i, item in prog: - progress.increment(step=i, item=item) + threadsafe=threadsafe, + hasretval=True) + getfiledata = [] + for final, res in prog: + if final: + getfiledata = res + else: + i, item = res + progress.increment(step=i, item=item) updated = len(actions[ACTION_GET]) if [a for a in actions[ACTION_GET] if a[0] == '.hgsubstate']: @@ -1778,6 +1801,8 @@ mfiles = set(a[0] for a in actions[ACTION_MERGE]) for k, acts in extraactions.iteritems(): actions[k].extend(acts) + if k == ACTION_GET and wantfiledata: + getfiledata.extend([None] * len(acts)) # Remove these files from actions[ACTION_MERGE] as well. This is # important because in recordupdates, files in actions[ACTION_MERGE] # are processed after files in other actions, and the merge driver @@ -1800,9 +1825,10 @@ if a[0] in mfiles] progress.complete() - return updateresult(updated, merged, removed, unresolved) + assert len(getfiledata) == (len(actions[ACTION_GET]) if wantfiledata else 0) + return updateresult(updated, merged, removed, unresolved), getfiledata -def recordupdates(repo, actions, branchmerge): +def recordupdates(repo, actions, branchmerge, getfiledata): "record merge actions to the dirstate" # remove (must come first) for f, args, msg in actions.get(ACTION_REMOVE, []): @@ -1846,11 +1872,12 @@ pass # get - for f, args, msg in actions.get(ACTION_GET, []): + for i, (f, args, msg) in enumerate(actions.get(ACTION_GET, [])): if branchmerge: repo.dirstate.otherparent(f) else: - repo.dirstate.normal(f) + parentfiledata = getfiledata[i] if getfiledata else None + repo.dirstate.normal(f, parentfiledata=parentfiledata) # merge for f, args, msg in actions.get(ACTION_MERGE, []): @@ -2169,12 +2196,15 @@ 'fsmonitor enabled; enable fsmonitor to improve performance; ' 'see "hg help -e fsmonitor")\n')) - stats = applyupdates(repo, actions, wc, p2, overwrite, labels=labels) + updatedirstate = not partial and not wc.isinmemory() + wantfiledata = updatedirstate and not branchmerge + stats, getfiledata = applyupdates(repo, actions, wc, p2, overwrite, + wantfiledata, labels=labels) - if not partial and not wc.isinmemory(): + if updatedirstate: with repo.dirstate.parentchange(): repo.setparents(fp1, fp2) - recordupdates(repo, actions, branchmerge) + recordupdates(repo, actions, branchmerge, getfiledata) # update completed, clear state util.unlink(repo.vfs.join('updatestate')) diff --git a/mercurial/narrowspec.py b/mercurial/narrowspec.py --- a/mercurial/narrowspec.py +++ b/mercurial/narrowspec.py @@ -259,7 +259,7 @@ if not repo.wvfs.exists(f): addgaction((f, (mf.flags(f), False), "narrowspec updated")) merge.applyupdates(repo, actions, wctx=repo[None], - mctx=repo['.'], overwrite=False) + mctx=repo['.'], overwrite=False, wantfiledata=False) def checkworkingcopynarrowspec(repo): storespec = repo.svfs.tryread(FILENAME) diff --git a/mercurial/sparse.py b/mercurial/sparse.py --- a/mercurial/sparse.py +++ b/mercurial/sparse.py @@ -248,7 +248,8 @@ typeactions = mergemod.emptyactions() typeactions['r'] = actions - mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False) + mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False, + wantfiledata=False) # Fix dirstate for file in dropped: @@ -382,7 +383,7 @@ typeactions = mergemod.emptyactions() typeactions['g'] = actions mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], - False) + False, wantfiledata=False) dirstate = repo.dirstate for file, flags, msg in actions: @@ -486,7 +487,8 @@ for f, (m, args, msg) in actions.iteritems(): typeactions[m].append((f, args, msg)) - mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False) + mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False, + wantfiledata=False) # Fix dirstate for file in added: diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -83,7 +83,8 @@ benefit = linear - (_STARTUP_COST * workers + linear / workers) return benefit >= 0.15 -def worker(ui, costperarg, func, staticargs, args, threadsafe=True): +def worker(ui, costperarg, func, staticargs, args, hasretval=False, + threadsafe=True): '''run a function, possibly in parallel in multiple worker processes. @@ -91,23 +92,27 @@ costperarg - cost of a single task - func - function to run + func - function to run. It is expected to return a progress iterator. staticargs - arguments to pass to every invocation of the function args - arguments to split into chunks, to pass to individual workers + hasretval - when True, func and the current function return an progress + iterator then a list (encoded as an iterator that yield many (False, ..) + then a (True, list)). The resulting list is in the natural order. + threadsafe - whether work items are thread safe and can be executed using a thread-based worker. Should be disabled for CPU heavy tasks that don't release the GIL. ''' enabled = ui.configbool('worker', 'enabled') if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): - return _platformworker(ui, func, staticargs, args) + return _platformworker(ui, func, staticargs, args, hasretval) return func(*staticargs + (args,)) -def _posixworker(ui, func, staticargs, args): +def _posixworker(ui, func, staticargs, args, hasretval): workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -157,7 +162,8 @@ ui.flush() parentpid = os.getpid() pipes = [] - for pargs in partition(args, workers): + retvals = [] + for i, pargs in enumerate(partition(args, workers)): # Every worker gets its own pipe to send results on, so we don't have to # implement atomic writes larger than PIPE_BUF. Each forked process has # its own pipe's descriptors in the local variables, and the parent @@ -165,6 +171,7 @@ # care what order they're in). rfd, wfd = os.pipe() pipes.append((rfd, wfd)) + retvals.append(None) # make sure we use os._exit in all worker code paths. otherwise the # worker may do some clean-ups which could cause surprises like # deadlock. see sshpeer.cleanup for example. @@ -185,7 +192,7 @@ os.close(w) os.close(rfd) for result in func(*(staticargs + (pargs,))): - os.write(wfd, util.pickle.dumps(result)) + os.write(wfd, util.pickle.dumps((i, result))) return 0 ret = scmutil.callcatch(ui, workerfunc) @@ -219,7 +226,11 @@ while openpipes > 0: for key, events in selector.select(): try: - yield util.pickle.load(key.fileobj) + i, res = util.pickle.load(key.fileobj) + if hasretval and res[0]: + retvals[i] = res[1] + else: + yield res except EOFError: selector.unregister(key.fileobj) key.fileobj.close() @@ -237,6 +248,8 @@ if status < 0: os.kill(os.getpid(), -status) sys.exit(status) + if hasretval: + yield True, sum(retvals, []) def _posixexitstatus(code): '''convert a posix exit status into the same form returned by @@ -248,7 +261,7 @@ elif os.WIFSIGNALED(code): return -os.WTERMSIG(code) -def _windowsworker(ui, func, staticargs, args): +def _windowsworker(ui, func, staticargs, args, hasretval): class Worker(threading.Thread): def __init__(self, taskqueue, resultqueue, func, staticargs, *args, **kwargs): @@ -268,9 +281,9 @@ try: while not self._taskqueue.empty(): try: - args = self._taskqueue.get_nowait() + i, args = self._taskqueue.get_nowait() for res in self._func(*self._staticargs + (args,)): - self._resultqueue.put(res) + self._resultqueue.put((i, res)) # threading doesn't provide a native way to # interrupt execution. handle it manually at every # iteration. @@ -305,9 +318,11 @@ workers = _numworkers(ui) resultqueue = pycompat.queue.Queue() taskqueue = pycompat.queue.Queue() + retvals = [] # partition work to more pieces than workers to minimize the chance # of uneven distribution of large tasks between the workers - for pargs in partition(args, workers * 20): + for pargs in enumerate(partition(args, workers * 20)): + retvals.append(None) taskqueue.put(pargs) for _i in range(workers): t = Worker(taskqueue, resultqueue, func, staticargs) @@ -316,7 +331,11 @@ try: while len(threads) > 0: while not resultqueue.empty(): - yield resultqueue.get() + (i, res) = resultqueue.get() + if hasretval and res[0]: + retvals[i] = res[1] + else: + yield res threads[0].join(0.05) finishedthreads = [_t for _t in threads if not _t.is_alive()] for t in finishedthreads: @@ -327,7 +346,13 @@ trykillworkers() raise while not resultqueue.empty(): - yield resultqueue.get() + (i, res) = resultqueue.get() + if hasretval and res[0]: + retvals[i] = res[1] + else: + yield res + if hasretval: + yield True, sum(retvals, []) if pycompat.iswindows: _platformworker = _windowsworker diff --git a/tests/test-dirstate-race2.t b/tests/test-dirstate-race2.t --- a/tests/test-dirstate-race2.t +++ b/tests/test-dirstate-race2.t @@ -27,13 +27,11 @@ > EOF Do an update where file 'a' is changed between hg writing it to disk -and hg writing the dirstate. It results in a corrupted dirstate, which -stores the wrong size, and thus hg status shows spuriously modified -files. +and hg writing the dirstate. The dirstate is correct nonetheless, and +so hg status correctly shows a as clean. $ hg up -r 0 --config extensions.race=$TESTTMP/dirstaterace.py 1 files updated, 0 files merged, 0 files removed, 0 files unresolved $ hg debugdirstate --no-dates - n 644 0 (set |unset) a (re) + n 644 2 (set |unset) a (re) $ echo a > a; hg status; hg diff - M a diff --git a/tests/test-dirstate.t b/tests/test-dirstate.t --- a/tests/test-dirstate.t +++ b/tests/test-dirstate.t @@ -73,7 +73,7 @@ > merge, > ) > - > def wraprecordupdates(orig, repo, actions, branchmerge): + > def wraprecordupdates(*args): > raise error.Abort("simulated error while recording dirstateupdates") > > def reposetup(ui, repo):