diff --git a/mercurial/utils/cborutil.py b/mercurial/utils/cborutil.py --- a/mercurial/utils/cborutil.py +++ b/mercurial/utils/cborutil.py @@ -10,10 +10,6 @@ import struct import sys -from ..thirdparty.cbor.cbor2 import ( - decoder as decodermod, -) - # Very short very of RFC 7049... # # Each item begins with a byte. The 3 high bits of that byte denote the @@ -219,54 +215,6 @@ return fn(v) -def readindefinitebytestringtoiter(fh, expectheader=True): - """Read an indefinite bytestring to a generator. - - Receives an object with a ``read(X)`` method to read N bytes. - - If ``expectheader`` is True, it is expected that the first byte read - will represent an indefinite length bytestring. Otherwise, we - expect the first byte to be part of the first bytestring chunk. - """ - read = fh.read - decodeuint = decodermod.decode_uint - byteasinteger = decodermod.byte_as_integer - - if expectheader: - initial = decodermod.byte_as_integer(read(1)) - - majortype = initial >> 5 - subtype = initial & SUBTYPE_MASK - - if majortype != MAJOR_TYPE_BYTESTRING: - raise decodermod.CBORDecodeError( - 'expected major type %d; got %d' % (MAJOR_TYPE_BYTESTRING, - majortype)) - - if subtype != SUBTYPE_INDEFINITE: - raise decodermod.CBORDecodeError( - 'expected indefinite subtype; got %d' % subtype) - - # The indefinite bytestring is composed of chunks of normal bytestrings. - # Read chunks until we hit a BREAK byte. - - while True: - # We need to sniff for the BREAK byte. - initial = byteasinteger(read(1)) - - if initial == BREAK_INT: - break - - length = decodeuint(fh, initial & SUBTYPE_MASK) - chunk = read(length) - - if len(chunk) != length: - raise decodermod.CBORDecodeError( - 'failed to read bytestring chunk: got %d bytes; expected %d' % ( - len(chunk), length)) - - yield chunk - class CBORDecodeError(Exception): """Represents an error decoding CBOR.""" diff --git a/tests/test-cbor.py b/tests/test-cbor.py --- a/tests/test-cbor.py +++ b/tests/test-cbor.py @@ -1,6 +1,5 @@ from __future__ import absolute_import -import io import unittest from mercurial.thirdparty import ( @@ -118,16 +117,6 @@ self.assertTrue(b[0].isfirst) self.assertTrue(b[0].islast) - def testreadtoiter(self): - source = io.BytesIO(b'\x5f\x44\xaa\xbb\xcc\xdd\x43\xee\xff\x99\xff') - - it = cborutil.readindefinitebytestringtoiter(source) - self.assertEqual(next(it), b'\xaa\xbb\xcc\xdd') - self.assertEqual(next(it), b'\xee\xff\x99') - - with self.assertRaises(StopIteration): - next(it) - def testdecodevariouslengths(self): for i in (0, 1, 22, 23, 24, 25, 254, 255, 256, 65534, 65535, 65536): source = b'x' * i