diff --git a/infinitepush/__init__.py b/infinitepush/__init__.py --- a/infinitepush/__init__.py +++ b/infinitepush/__init__.py @@ -73,6 +73,10 @@ # in infinitepush index for nodes that are ancestor of the bookmark. fillmetadatabranchpattern = '' + # Instructs infinitepush to forward all received bundle2 parts to the + # bundle for storage. Defaults to False. + storeallparts = True + [remotenames] # Client-side option # This option should be set only if remotenames extension is enabled. @@ -268,6 +272,8 @@ _lookupwrap(wireproto.commands['lookup'][0]), 'key') wrapfunction(exchange, 'getbundlechunks', getbundlechunks) + wrapfunction(bundle2, 'processparts', processparts) + def clientextsetup(ui): entry = wrapcommand(commands.table, 'push', _push) # Don't add the 'to' arg if it already exists @@ -859,6 +865,11 @@ pushop.cgresult = 0 return + # This parameter tells the server that the following bundle is an + # infinitepush. This let's it switch the part processing to our infinitepush + # code path. + bundler.addparam("infinitepush", "True") + nonforwardmove = pushop.force or pushop.ui.configbool(experimental, confignonforwardmove) scratchpart = getscratchbranchpart(pushop.repo, @@ -936,6 +947,90 @@ logger = logger[0] return logger +def processparts(orig, repo, op, unbundler): + if unbundler.params.get('infinitepush') != 'True': + return orig(repo, op, unbundler) + + handleallparts = repo.ui.configbool('infinitepush', 'storeallparts') + + bundler = bundle2.bundle20(repo.ui) + cgparams = None + scratchbookpart = None + with bundle2.partiterator(repo, op, unbundler) as parts: + for part in parts: + bundlepart = None + if part.type == 'replycaps': + # This configures the current operation to allow reply parts. + bundle2._processpart(op, part) + elif part.type == scratchbranchparttype: + # Scratch branch parts need to be converted to normal + # changegroup parts, and the extra parameters stored for later + # when we upload to the store. Eventually those parameters will + # be put on the actual bundle instead of this part, then we can + # send a vanilla changegroup instead of the scratchbranch part. + cgversion = part.params.get('cgversion', '01') + bundlepart = bundle2.bundlepart('changegroup', data=part.read()) + bundlepart.addparam('version', cgversion) + cgparams = part.params + + # If we're not dumping all parts into the new bundle, we need to + # alert the future pushkey handler to skip the part. + if not handleallparts: + op.records.add(scratchbranchparttype + '_skippushkey', True) + elif part.type == scratchbookmarksparttype: + # Save this for later processing. Details below. + scratchbookpart = part + else: + if handleallparts: + # Ideally we would not process any parts, and instead just + # forward them to the bundle for storage, but since this + # differs from previous behavior, we need to put it behind a + # config flag for incremental rollout. + bundlepart = bundle2.bundlepart(part.type, data=part.read()) + for key, value in part.params.iteritems(): + bundlepart.addparam(key, value) + + # Certain parts require a response + if part.type == 'pushkey': + if op.reply is not None: + rpart = op.reply.newpart('reply:pushkey') + rpart.addparam('in-reply-to', str(part.id), + mandatory=False) + rpart.addparam('return', '1', mandatory=False) + else: + bundle2._processpart(op, part) + + if handleallparts: + op.records.add(part.type, { + 'return': 1, + }) + if bundlepart: + bundler.addpart(bundlepart) + + # If commits were sent, store them + if cgparams: + buf = util.chunkbuffer(bundler.getchunks()) + fd, bundlefile = tempfile.mkstemp() + try: + try: + fp = os.fdopen(fd, 'wb') + fp.write(buf.read()) + finally: + fp.close() + storebundle(op, cgparams, bundlefile) + finally: + try: + os.unlink(bundlefile) + except Exception: + # we would rather see the original exception + pass + + # The scratch bookmark part is sent as part of a push backup. It needs to be + # processed after the main bundle has been stored, so that any commits it + # references are available in the store. + if scratchbookpart: + bundle2._processpart(op, part) + def storebundle(op, params, bundlefile): log = _getorcreateinfinitepushlogger(op) parthandlerstart = time.time() diff --git a/infinitepush/backupcommands.py b/infinitepush/backupcommands.py --- a/infinitepush/backupcommands.py +++ b/infinitepush/backupcommands.py @@ -446,6 +446,7 @@ wrapfunction(changegroup.cg2packer, 'deltaparent', _deltaparent) try: bundler = _createbundler(ui, repo, other) + bundler.addparam("infinitepush", "True") backup = False if outgoing and outgoing.missing: backup = True