diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py --- a/mercurial/bundle2.py +++ b/mercurial/bundle2.py @@ -854,7 +854,7 @@ indebug(self.ui, 'start extraction of bundle2 parts') headerblock = self._readpartheader() while headerblock is not None: - part = unbundlepart(self.ui, headerblock, self._fp) + part = seekableunbundlepart(self.ui, headerblock, self._fp) yield part # Seek to the end of the part to force it's consumption so the next # part can be read. But then seek back to the beginning so the @@ -1155,7 +1155,7 @@ if headerblock is None: indebug(self.ui, 'no part found during interruption.') return - part = unbundlepart(self.ui, headerblock, self._fp) + part = seekableunbundlepart(self.ui, headerblock, self._fp) op = interruptoperation(self.ui) hardabort = False try: @@ -1207,10 +1207,8 @@ self.advisoryparams = None self.params = None self.mandatorykeys = () - self._payloadstream = None self._readheader() self._mandatory = None - self._chunkindex = [] #(payload, file) position tuples for chunk starts self._pos = 0 def _fromheader(self, size): @@ -1237,46 +1235,6 @@ self.params.update(self.advisoryparams) self.mandatorykeys = frozenset(p[0] for p in mandatoryparams) - def _payloadchunks(self, chunknum=0): - '''seek to specified chunk and start yielding data''' - if len(self._chunkindex) == 0: - assert chunknum == 0, 'Must start with chunk 0' - self._chunkindex.append((0, self._tellfp())) - else: - assert chunknum < len(self._chunkindex), \ - 'Unknown chunk %d' % chunknum - self._seekfp(self._chunkindex[chunknum][1]) - - pos = self._chunkindex[chunknum][0] - payloadsize = self._unpack(_fpayloadsize)[0] - indebug(self.ui, 'payload chunk size: %i' % payloadsize) - while payloadsize: - if payloadsize == flaginterrupt: - # interruption detection, the handler will now read a - # single part and process it. - interrupthandler(self.ui, self._fp)() - elif payloadsize < 0: - msg = 'negative payload chunk size: %i' % payloadsize - raise error.BundleValueError(msg) - else: - result = self._readexact(payloadsize) - chunknum += 1 - pos += payloadsize - if chunknum == len(self._chunkindex): - self._chunkindex.append((pos, self._tellfp())) - yield result - payloadsize = self._unpack(_fpayloadsize)[0] - indebug(self.ui, 'payload chunk size: %i' % payloadsize) - - def _findchunk(self, pos): - '''for a given payload position, return a chunk number and offset''' - for chunk, (ppos, fpos) in enumerate(self._chunkindex): - if ppos == pos: - return chunk, 0 - elif ppos > pos: - return chunk - 1, pos - self._chunkindex[chunk - 1][0] - raise ValueError('Unknown chunk') - def _readheader(self): """read the header and setup the object""" typesize = self._unpackheader(_fparttypesize)[0] @@ -1328,6 +1286,69 @@ self.consumed = True return data +class seekableunbundlepart(unbundlepart): + """A bundle2 part in a bundle that is seekable. + + Regular ``unbundlepart`` instances can only be read once. This class + extends ``unbundlepart`` to enable bi-directional seeking within the + part. + + Bundle2 part data consists of framed chunks. Offsets when seeking + refer to the decoded data, not the offsets in the underlying bundle2 + stream. + + To facilitate quickly seeking within the decoded data, instances of this + class maintain a mapping between offsets in the underlying stream and + the decoded payload. This mapping will consume memory in proportion + to the number of chunks within the payload (which almost certainly + increases in proportion with the size of the part). + """ + def __init__(self, ui, header, fp): + # (payload, file) offsets for chunk starts. + self._chunkindex = [] + + super(seekableunbundlepart, self).__init__(ui, header, fp) + + def _payloadchunks(self, chunknum=0): + '''seek to specified chunk and start yielding data''' + if len(self._chunkindex) == 0: + assert chunknum == 0, 'Must start with chunk 0' + self._chunkindex.append((0, self._tellfp())) + else: + assert chunknum < len(self._chunkindex), \ + 'Unknown chunk %d' % chunknum + self._seekfp(self._chunkindex[chunknum][1]) + + pos = self._chunkindex[chunknum][0] + payloadsize = self._unpack(_fpayloadsize)[0] + indebug(self.ui, 'payload chunk size: %i' % payloadsize) + while payloadsize: + if payloadsize == flaginterrupt: + # interruption detection, the handler will now read a + # single part and process it. + interrupthandler(self.ui, self._fp)() + elif payloadsize < 0: + msg = 'negative payload chunk size: %i' % payloadsize + raise error.BundleValueError(msg) + else: + result = self._readexact(payloadsize) + chunknum += 1 + pos += payloadsize + if chunknum == len(self._chunkindex): + self._chunkindex.append((pos, self._tellfp())) + yield result + payloadsize = self._unpack(_fpayloadsize)[0] + indebug(self.ui, 'payload chunk size: %i' % payloadsize) + + def _findchunk(self, pos): + '''for a given payload position, return a chunk number and offset''' + for chunk, (ppos, fpos) in enumerate(self._chunkindex): + if ppos == pos: + return chunk, 0 + elif ppos > pos: + return chunk - 1, pos - self._chunkindex[chunk - 1][0] + raise ValueError('Unknown chunk') + def tell(self): return self._pos