diff --git a/hgext/largefiles/proto.py b/hgext/largefiles/proto.py --- a/hgext/largefiles/proto.py +++ b/hgext/largefiles/proto.py @@ -41,7 +41,8 @@ tmpfp = util.atomictempfile(path, createmode=repo.store.createmode) try: - proto.forwardpayload(tmpfp) + for p in proto.getpayload(): + tmpfp.write(p) tmpfp._fp.seek(0) if sha != lfutil.hexsha1(tmpfp._fp): raise IOError(0, _('largefile contents do not match hash')) diff --git a/mercurial/configitems.py b/mercurial/configitems.py --- a/mercurial/configitems.py +++ b/mercurial/configitems.py @@ -902,6 +902,9 @@ coreconfigitem('server', 'disablefullbundle', default=False, ) +coreconfigitem('server', 'streamunbundle', + default=False, +) coreconfigitem('server', 'maxhttpheaderlen', default=1024, ) diff --git a/mercurial/help/config.txt b/mercurial/help/config.txt --- a/mercurial/help/config.txt +++ b/mercurial/help/config.txt @@ -1792,6 +1792,11 @@ are highly recommended. Partial clones will still be allowed. (default: False) +``streamunbundle`` + When set, servers will apply data sent from the client directly, + otherwise it will be written to a temporary file first. This option + effectively prevents concurrent pushes. + ``concurrent-push-mode`` Level of allowed race condition between two pushing clients. diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py --- a/mercurial/wireproto.py +++ b/mercurial/wireproto.py @@ -972,14 +972,29 @@ with proto.mayberedirectstdio() as output: try: exchange.check_heads(repo, their_heads, 'preparing changes') + cleanup = lambda: None + try: + payload = proto.getpayload() + if repo.ui.configbool('server', 'streamunbundle'): + def cleanup(): + for p in payload: + pass + fp = util.chunkbuffer(payload) + else: + # write bundle data to temporary file because it can be big + fp, tempname = None, None + def cleanup(): + if fp: + fp.close() + if tempname: + os.unlink(tempname) + fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-') + fp = os.fdopen(fd, pycompat.sysstr('wb+')) + r = 0 + for p in payload: + fp.write(p) + fp.seek(0) - # write bundle data to temporary file because it can be big - fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-') - fp = os.fdopen(fd, pycompat.sysstr('wb+')) - r = 0 - try: - proto.forwardpayload(fp) - fp.seek(0) gen = exchange.readbundle(repo.ui, fp, None) if (isinstance(gen, changegroupmod.cg1unpacker) and not bundle1allowed(repo, 'push')): @@ -1001,8 +1016,7 @@ return pushres(r, output.getvalue() if output else '') finally: - fp.close() - os.unlink(tempname) + cleanup() except (error.BundleValueError, error.Abort, error.PushRaced) as exc: # handle non-bundle2 case first diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py --- a/mercurial/wireprotoserver.py +++ b/mercurial/wireprotoserver.py @@ -91,7 +91,7 @@ args.update(urlreq.parseqs(argvalue, keep_blank_values=True)) return args - def forwardpayload(self, fp): + def getpayload(self): if r'HTTP_CONTENT_LENGTH' in self._req.env: length = int(self._req.env[r'HTTP_CONTENT_LENGTH']) else: @@ -99,8 +99,7 @@ # If httppostargs is used, we need to read Content-Length # minus the amount that was consumed by args. length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0)) - for s in util.filechunkiter(self._req, limit=length): - fp.write(s) + return util.filechunkiter(self._req, limit=length) @contextlib.contextmanager def mayberedirectstdio(self): @@ -346,7 +345,7 @@ data[arg] = val return [data[k] for k in keys] - def forwardpayload(self, fpout): + def getpayload(self): # We initially send an empty response. This tells the client it is # OK to start sending data. If a client sees any other response, it # interprets it as an error. @@ -359,7 +358,7 @@ # 0\n count = int(self._fin.readline()) while count: - fpout.write(self._fin.read(count)) + yield self._fin.read(count) count = int(self._fin.readline()) @contextlib.contextmanager diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py --- a/mercurial/wireprototypes.py +++ b/mercurial/wireprototypes.py @@ -92,10 +92,11 @@ returns a list of values (same order as )""" @abc.abstractmethod - def forwardpayload(self, fp): - """Read the raw payload and forward to a file. + def getpayload(self): + """Provide a generator for the raw payload. - The payload is read in full before the function returns. + The caller is responsible for ensuring that the full payload is + processed. """ @abc.abstractmethod