diff --git a/mercurial/utils/cborutil.py b/mercurial/utils/cborutil.py new file mode 100644 --- /dev/null +++ b/mercurial/utils/cborutil.py @@ -0,0 +1,78 @@ +# cborutil.py - CBOR extensions +# +# Copyright 2018 Gregory Szorc +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + +from __future__ import absolute_import + +import contextlib +import struct + +def beginindefinitearray(encoder): + # major type = 4, information value 31 to specify indefinite length. + encoder.write(struct.pack(r'>B', 4 << 5 | 31)) + +def beginindefinitemap(encoder): + # major type = 5, information value 31 to specify indefinite length. + encoder.write(struct.pack(r'>B', 5 << 5 | 31)) + +def endindefinite(encoder): + encoder.write(b'\xff') + +@contextlib.contextmanager +def streamarray(encoder): + """Write an array in a streaming manner. + + Used as a context manager, the context manager resolves to a function + that should be called for each item to write to the array. + + When the context manager exits, the indefinite length array is ended. + """ + def writeitem(value): + encoder.encode(value) + + beginindefinitearray(encoder) + yield writeitem + endindefinite(encoder) + +def streamarrayitems(encoder, items): + """Write out an iterable of items to a streaming array.""" + with streamarray(encoder) as fn: + for value in items: + fn(value) + +@contextlib.contextmanager +def streammap(encoder): + """Write a map in a streaming manner. + + Used as a context manager, the context manager resolves to a function + that should be called with a key and value of each map item to write. + + When the context manager exits, the indefinite length map is ended. + + If is possible to nest streaming data structures. If the caller writes + out 2 values, the first value will be interpreted as a key and the second + a value. So a caller could do something like:: + + with streammap(encoder): + encoder.encode(b'mykey') + with streammap(encoder) as fn: + fn(b'innerkey', b'value') + + This would decode to ``{b'mykey': {b'innerkey': b'value'}}``. + """ + def writeitem(key, value): + encoder.encode(key) + encoder.encode(value) + + beginindefinitemap(encoder) + yield writeitem + endindefinite(encoder) + +def streammapitems(encoder, items): + """Write out an iterable of (key, value) items to a streaming map.""" + with streammap(encoder) as fn: + for key, value in items: + fn(key, value) diff --git a/tests/test-cbor.py b/tests/test-cbor.py new file mode 100644 --- /dev/null +++ b/tests/test-cbor.py @@ -0,0 +1,176 @@ +from __future__ import absolute_import + +import io +import unittest + +from mercurial.thirdparty import ( + cbor, +) +from mercurial.utils import ( + cborutil, +) + +class StreamArrayTests(unittest.TestCase): + def testempty(self): + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + with cborutil.streamarray(encoder): + pass + + self.assertEqual(b.getvalue(), '\x9f\xff') + self.assertEqual(cbor.loads(b.getvalue()), []) + + def testone(self): + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + with cborutil.streamarray(encoder) as fn: + fn(b'foo') + + self.assertEqual(cbor.loads(b.getvalue()), [b'foo']) + + def testmultiple(self): + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + with cborutil.streamarray(encoder) as fn: + fn(0) + fn(True) + fn(b'foo') + fn(None) + + self.assertEqual(cbor.loads(b.getvalue()), [0, True, b'foo', None]) + + def testnested(self): + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + with cborutil.streamarray(encoder): + with cborutil.streamarray(encoder) as fn: + fn(b'foo') + fn(b'bar') + + self.assertEqual(cbor.loads(b.getvalue()), [[b'foo', b'bar']]) + + def testitemslist(self): + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + orig = [b'foo', b'bar', None, True, 42] + + cborutil.streamarrayitems(encoder, orig) + self.assertEqual(cbor.loads(b.getvalue()), orig) + + def testitemsgen(self): + def makeitems(): + yield b'foo' + yield b'bar' + yield None + yield 42 + + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + cborutil.streamarrayitems(encoder, makeitems()) + self.assertEqual(cbor.loads(b.getvalue()), [b'foo', b'bar', None, 42]) + +class StreamMapTests(unittest.TestCase): + def testempty(self): + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + with cborutil.streammap(encoder): + pass + + self.assertEqual(b.getvalue(), '\xbf\xff') + self.assertEqual(cbor.loads(b.getvalue()), {}) + + def testone(self): + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + with cborutil.streammap(encoder) as fn: + fn(b'key1', b'value1') + + self.assertEqual(cbor.loads(b.getvalue()), {b'key1': b'value1'}) + + def testmultiple(self): + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + with cborutil.streammap(encoder) as fn: + fn(0, 1) + fn(b'key1', b'value1') + fn(True, None) + + self.assertEqual(cbor.loads(b.getvalue()), { + 0: 1, + b'key1': b'value1', + True: None, + }) + + def testcomplex(self): + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + with cborutil.streammap(encoder) as fn: + fn(b'key1', b'value1') + fn(b'map', {b'inner1key': b'inner1value'}) + fn(b'array', [0, 1, 2]) + + self.assertEqual(cbor.loads(b.getvalue()), { + b'key1': b'value1', + b'map': {b'inner1key': b'inner1value'}, + b'array': [0, 1, 2], + }) + + def testnested(self): + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + with cborutil.streammap(encoder): + encoder.encode(b'streamkey') + with cborutil.streammap(encoder) as fn2: + fn2(b'inner1key', b'inner1value') + fn2(0, 1) + + self.assertEqual(cbor.loads(b.getvalue()), { + b'streamkey': { + b'inner1key': b'inner1value', + 0: 1, + }, + }) + + def testitemsdict(self): + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + orig = [ + (b'foo', b'bar'), + (42, 19), + (None, True), + ] + + cborutil.streammapitems(encoder, orig) + self.assertEqual(cbor.loads(b.getvalue()), dict(orig)) + + def testitemsgen(self): + def makeitems(): + yield b'foo', b'bar' + yield None, True + yield 42, 19 + + b = io.BytesIO() + encoder = cbor.CBOREncoder(b) + + cborutil.streammapitems(encoder, makeitems()) + self.assertEqual(cbor.loads(b.getvalue()), { + b'foo': b'bar', + None: True, + 42: 19, + }) + +if __name__ == '__main__': + import silenttestrunner + silenttestrunner.main(__name__)