diff --git a/mercurial/debugcommands.py b/mercurial/debugcommands.py --- a/mercurial/debugcommands.py +++ b/mercurial/debugcommands.py @@ -88,7 +88,6 @@ url as urlmod, util, vfs as vfsmod, - wireprotoframing, wireprotoserver, ) from .interfaces import repository @@ -4455,12 +4454,6 @@ The content of the file defined as the value to this argument will be transferred verbatim as the HTTP request body. - ``frame `` - Send a unified protocol frame as part of the request body. - - All frames will be collected and sent as the body to the HTTP - request. - close ----- @@ -4500,34 +4493,6 @@ --------- ``read()`` N bytes from the server's stderr pipe, if available. - - Specifying Unified Frame-Based Protocol Frames - ---------------------------------------------- - - It is possible to emit a *Unified Frame-Based Protocol* by using special - syntax. - - A frame is composed as a type, flags, and payload. These can be parsed - from a string of the form: - - - - ``request-id`` and ``stream-id`` are integers defining the request and - stream identifiers. - - ``type`` can be an integer value for the frame type or the string name - of the type. The strings are defined in ``wireprotoframing.py``. e.g. - ``command-name``. - - ``stream-flags`` and ``flags`` are a ``|`` delimited list of flag - components. Each component (and there can be just one) can be an integer - or a flag name for stream flags or frame flags, respectively. Values are - resolved to integers and then bitwise OR'd together. - - ``payload`` represents the raw frame payload. If it begins with - ``cbor:``, the following string is evaluated as Python code and the - resulting object is fed into a CBOR encoder. Otherwise it is interpreted - as a Python byte string literal. """ opts = pycompat.byteskwargs(opts) @@ -4785,7 +4750,7 @@ method, httppath = request[1:] headers = {} body = None - frames = [] + for line in lines: line = line.lstrip() m = re.match(b'^([a-zA-Z0-9_-]+): (.*)$', line) @@ -4799,12 +4764,6 @@ if line.startswith(b'BODYFILE '): with open(line.split(b' ', 1), b'rb') as fh: body = fh.read() - elif line.startswith(b'frame '): - frame = wireprotoframing.makeframefromhumanstring( - line[len(b'frame ') :] - ) - - frames.append(frame) else: raise error.Abort( _(b'unknown argument to httprequest: %s') % line @@ -4812,9 +4771,6 @@ url = path + httppath - if frames: - body = b''.join(bytes(f) for f in frames) - req = urlmod.urlreq.request(pycompat.strurl(url), body, headers) # urllib.Request insists on using has_data() as a proxy for diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py deleted file mode 100644 --- a/mercurial/wireprotoframing.py +++ /dev/null @@ -1,2089 +0,0 @@ -# wireprotoframing.py - unified framing protocol for wire protocol -# -# Copyright 2018 Gregory Szorc -# -# This software may be used and distributed according to the terms of the -# GNU General Public License version 2 or any later version. - -# This file contains functionality to support the unified frame-based wire -# protocol. For details about the protocol, see -# `hg help internals.wireprotocol`. - - -import collections -import struct - -from .i18n import _ -from .pycompat import getattr -from .thirdparty import attr -from . import ( - encoding, - error, - pycompat, - util, - wireprototypes, -) -from .utils import ( - cborutil, - stringutil, -) - -FRAME_HEADER_SIZE = 8 -DEFAULT_MAX_FRAME_SIZE = 32768 - -STREAM_FLAG_BEGIN_STREAM = 0x01 -STREAM_FLAG_END_STREAM = 0x02 -STREAM_FLAG_ENCODING_APPLIED = 0x04 - -STREAM_FLAGS = { - b'stream-begin': STREAM_FLAG_BEGIN_STREAM, - b'stream-end': STREAM_FLAG_END_STREAM, - b'encoded': STREAM_FLAG_ENCODING_APPLIED, -} - -FRAME_TYPE_COMMAND_REQUEST = 0x01 -FRAME_TYPE_COMMAND_DATA = 0x02 -FRAME_TYPE_COMMAND_RESPONSE = 0x03 -FRAME_TYPE_ERROR_RESPONSE = 0x05 -FRAME_TYPE_TEXT_OUTPUT = 0x06 -FRAME_TYPE_PROGRESS = 0x07 -FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08 -FRAME_TYPE_STREAM_SETTINGS = 0x09 - -FRAME_TYPES = { - b'command-request': FRAME_TYPE_COMMAND_REQUEST, - b'command-data': FRAME_TYPE_COMMAND_DATA, - b'command-response': FRAME_TYPE_COMMAND_RESPONSE, - b'error-response': FRAME_TYPE_ERROR_RESPONSE, - b'text-output': FRAME_TYPE_TEXT_OUTPUT, - b'progress': FRAME_TYPE_PROGRESS, - b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS, - b'stream-settings': FRAME_TYPE_STREAM_SETTINGS, -} - -FLAG_COMMAND_REQUEST_NEW = 0x01 -FLAG_COMMAND_REQUEST_CONTINUATION = 0x02 -FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04 -FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08 - -FLAGS_COMMAND_REQUEST = { - b'new': FLAG_COMMAND_REQUEST_NEW, - b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION, - b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES, - b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA, -} - -FLAG_COMMAND_DATA_CONTINUATION = 0x01 -FLAG_COMMAND_DATA_EOS = 0x02 - -FLAGS_COMMAND_DATA = { - b'continuation': FLAG_COMMAND_DATA_CONTINUATION, - b'eos': FLAG_COMMAND_DATA_EOS, -} - -FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01 -FLAG_COMMAND_RESPONSE_EOS = 0x02 - -FLAGS_COMMAND_RESPONSE = { - b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION, - b'eos': FLAG_COMMAND_RESPONSE_EOS, -} - -FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01 -FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02 - -FLAGS_SENDER_PROTOCOL_SETTINGS = { - b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION, - b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS, -} - -FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01 -FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02 - -FLAGS_STREAM_ENCODING_SETTINGS = { - b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION, - b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS, -} - -# Maps frame types to their available flags. -FRAME_TYPE_FLAGS = { - FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST, - FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA, - FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE, - FRAME_TYPE_ERROR_RESPONSE: {}, - FRAME_TYPE_TEXT_OUTPUT: {}, - FRAME_TYPE_PROGRESS: {}, - FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS, - FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS, -} - -ARGUMENT_RECORD_HEADER = struct.Struct('= val: - if value & val: - flags.append(namemap.get(val, b'' % val)) - val <<= 1 - - return b'|'.join(flags) - - -@attr.s(slots=True) -class frameheader: - """Represents the data in a frame header.""" - - length = attr.ib() - requestid = attr.ib() - streamid = attr.ib() - streamflags = attr.ib() - typeid = attr.ib() - flags = attr.ib() - - -@attr.s(slots=True, repr=False) -class frame: - """Represents a parsed frame.""" - - requestid = attr.ib() - streamid = attr.ib() - streamflags = attr.ib() - typeid = attr.ib() - flags = attr.ib() - payload = attr.ib() - - @encoding.strmethod - def __repr__(self): - typename = b'' % self.typeid - for name, value in FRAME_TYPES.items(): - if value == self.typeid: - typename = name - break - - return ( - b'frame(size=%d; request=%d; stream=%d; streamflags=%s; ' - b'type=%s; flags=%s)' - % ( - len(self.payload), - self.requestid, - self.streamid, - humanflags(STREAM_FLAGS, self.streamflags), - typename, - humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags), - ) - ) - - -def makeframe(requestid, streamid, streamflags, typeid, flags, payload): - """Assemble a frame into a byte array.""" - # TODO assert size of payload. - frame = bytearray(FRAME_HEADER_SIZE + len(payload)) - - # 24 bits length - # 16 bits request id - # 8 bits stream id - # 8 bits stream flags - # 4 bits type - # 4 bits flags - - l = struct.pack(' - - This can be used by user-facing applications and tests for creating - frames easily without having to type out a bunch of constants. - - Request ID and stream IDs are integers. - - Stream flags, frame type, and flags can be specified by integer or - named constant. - - Flags can be delimited by `|` to bitwise OR them together. - - If the payload begins with ``cbor:``, the following string will be - evaluated as Python literal and the resulting object will be fed into - a CBOR encoder. Otherwise, the payload is interpreted as a Python - byte string literal. - """ - fields = s.split(b' ', 5) - requestid, streamid, streamflags, frametype, frameflags, payload = fields - - requestid = int(requestid) - streamid = int(streamid) - - finalstreamflags = 0 - for flag in streamflags.split(b'|'): - if flag in STREAM_FLAGS: - finalstreamflags |= STREAM_FLAGS[flag] - else: - finalstreamflags |= int(flag) - - if frametype in FRAME_TYPES: - frametype = FRAME_TYPES[frametype] - else: - frametype = int(frametype) - - finalflags = 0 - validflags = FRAME_TYPE_FLAGS[frametype] - for flag in frameflags.split(b'|'): - if flag in validflags: - finalflags |= validflags[flag] - else: - finalflags |= int(flag) - - if payload.startswith(b'cbor:'): - payload = b''.join( - cborutil.streamencode(stringutil.evalpythonliteral(payload[5:])) - ) - - else: - payload = stringutil.unescapestr(payload) - - return makeframe( - requestid=requestid, - streamid=streamid, - streamflags=finalstreamflags, - typeid=frametype, - flags=finalflags, - payload=payload, - ) - - -def parseheader(data): - """Parse a unified framing protocol frame header from a buffer. - - The header is expected to be in the buffer at offset 0 and the - buffer is expected to be large enough to hold a full header. - """ - # 24 bits payload length (little endian) - # 16 bits request ID - # 8 bits stream ID - # 8 bits stream flags - # 4 bits frame type - # 4 bits frame flags - # ... payload - framelength = data[0] + 256 * data[1] + 16384 * data[2] - requestid, streamid, streamflags = struct.unpack_from('> 4 - frameflags = typeflags & 0x0F - - return frameheader( - framelength, requestid, streamid, streamflags, frametype, frameflags - ) - - -def readframe(fh): - """Read a unified framing protocol frame from a file object. - - Returns a 3-tuple of (type, flags, payload) for the decoded frame or - None if no frame is available. May raise if a malformed frame is - seen. - """ - header = bytearray(FRAME_HEADER_SIZE) - - readcount = fh.readinto(header) - - if readcount == 0: - return None - - if readcount != FRAME_HEADER_SIZE: - raise error.Abort( - _(b'received incomplete frame: got %d bytes: %s') - % (readcount, header) - ) - - h = parseheader(header) - - payload = fh.read(h.length) - if len(payload) != h.length: - raise error.Abort( - _(b'frame length error: expected %d; got %d') - % (h.length, len(payload)) - ) - - return frame( - h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload - ) - - -def createcommandframes( - stream, - requestid, - cmd, - args, - datafh=None, - maxframesize=DEFAULT_MAX_FRAME_SIZE, - redirect=None, -): - """Create frames necessary to transmit a request to run a command. - - This is a generator of bytearrays. Each item represents a frame - ready to be sent over the wire to a peer. - """ - data = {b'name': cmd} - if args: - data[b'args'] = args - - if redirect: - data[b'redirect'] = redirect - - data = b''.join(cborutil.streamencode(data)) - - offset = 0 - - while True: - flags = 0 - - # Must set new or continuation flag. - if not offset: - flags |= FLAG_COMMAND_REQUEST_NEW - else: - flags |= FLAG_COMMAND_REQUEST_CONTINUATION - - # Data frames is set on all frames. - if datafh: - flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA - - payload = data[offset : offset + maxframesize] - offset += len(payload) - - if len(payload) == maxframesize and offset < len(data): - flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES - - yield stream.makeframe( - requestid=requestid, - typeid=FRAME_TYPE_COMMAND_REQUEST, - flags=flags, - payload=payload, - ) - - if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES): - break - - if datafh: - while True: - data = datafh.read(DEFAULT_MAX_FRAME_SIZE) - - done = False - if len(data) == DEFAULT_MAX_FRAME_SIZE: - flags = FLAG_COMMAND_DATA_CONTINUATION - else: - flags = FLAG_COMMAND_DATA_EOS - assert datafh.read(1) == b'' - done = True - - yield stream.makeframe( - requestid=requestid, - typeid=FRAME_TYPE_COMMAND_DATA, - flags=flags, - payload=data, - ) - - if done: - break - - -def createcommandresponseokframe(stream, requestid): - overall = b''.join(cborutil.streamencode({b'status': b'ok'})) - - if stream.streamsettingssent: - overall = stream.encode(overall) - encoded = True - - if not overall: - return None - else: - encoded = False - - return stream.makeframe( - requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=FLAG_COMMAND_RESPONSE_CONTINUATION, - payload=overall, - encoded=encoded, - ) - - -def createcommandresponseeosframes( - stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE -): - """Create an empty payload frame representing command end-of-stream.""" - payload = stream.flush() - - offset = 0 - while True: - chunk = payload[offset : offset + maxframesize] - offset += len(chunk) - - done = offset == len(payload) - - if done: - flags = FLAG_COMMAND_RESPONSE_EOS - else: - flags = FLAG_COMMAND_RESPONSE_CONTINUATION - - yield stream.makeframe( - requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=flags, - payload=chunk, - encoded=payload != b'', - ) - - if done: - break - - -def createalternatelocationresponseframe(stream, requestid, location): - data = { - b'status': b'redirect', - b'location': { - b'url': location.url, - b'mediatype': location.mediatype, - }, - } - - for a in ( - 'size', - 'fullhashes', - 'fullhashseed', - 'serverdercerts', - 'servercadercerts', - ): - value = getattr(location, a) - if value is not None: - data[b'location'][pycompat.bytestr(a)] = value - - payload = b''.join(cborutil.streamencode(data)) - - if stream.streamsettingssent: - payload = stream.encode(payload) - encoded = True - else: - encoded = False - - return stream.makeframe( - requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=FLAG_COMMAND_RESPONSE_CONTINUATION, - payload=payload, - encoded=encoded, - ) - - -def createcommanderrorresponse(stream, requestid, message, args=None): - # TODO should this be using a list of {'msg': ..., 'args': {}} so atom - # formatting works consistently? - m = { - b'status': b'error', - b'error': { - b'message': message, - }, - } - - if args: - m[b'error'][b'args'] = args - - overall = b''.join(cborutil.streamencode(m)) - - yield stream.makeframe( - requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=FLAG_COMMAND_RESPONSE_EOS, - payload=overall, - ) - - -def createerrorframe(stream, requestid, msg, errtype): - # TODO properly handle frame size limits. - assert len(msg) <= DEFAULT_MAX_FRAME_SIZE - - payload = b''.join( - cborutil.streamencode( - { - b'type': errtype, - b'message': [{b'msg': msg}], - } - ) - ) - - yield stream.makeframe( - requestid=requestid, - typeid=FRAME_TYPE_ERROR_RESPONSE, - flags=0, - payload=payload, - ) - - -def createtextoutputframe( - stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE -): - """Create a text output frame to render text to people. - - ``atoms`` is a 3-tuple of (formatting string, args, labels). - - The formatting string contains ``%s`` tokens to be replaced by the - corresponding indexed entry in ``args``. ``labels`` is an iterable of - formatters to be applied at rendering time. In terms of the ``ui`` - class, each atom corresponds to a ``ui.write()``. - """ - atomdicts = [] - - for (formatting, args, labels) in atoms: - # TODO look for localstr, other types here? - - if not isinstance(formatting, bytes): - raise ValueError(b'must use bytes formatting strings') - for arg in args: - if not isinstance(arg, bytes): - raise ValueError(b'must use bytes for arguments') - for label in labels: - if not isinstance(label, bytes): - raise ValueError(b'must use bytes for labels') - - # Formatting string must be ASCII. - formatting = formatting.decode('ascii', 'replace').encode('ascii') - - # Arguments must be UTF-8. - args = [a.decode('utf-8', 'replace').encode('utf-8') for a in args] - - # Labels must be ASCII. - labels = [l.decode('ascii', 'strict').encode('ascii') for l in labels] - - atom = {b'msg': formatting} - if args: - atom[b'args'] = args - if labels: - atom[b'labels'] = labels - - atomdicts.append(atom) - - payload = b''.join(cborutil.streamencode(atomdicts)) - - if len(payload) > maxframesize: - raise ValueError(b'cannot encode data in a single frame') - - yield stream.makeframe( - requestid=requestid, - typeid=FRAME_TYPE_TEXT_OUTPUT, - flags=0, - payload=payload, - ) - - -class bufferingcommandresponseemitter: - """Helper object to emit command response frames intelligently. - - Raw command response data is likely emitted in chunks much smaller - than what can fit in a single frame. This class exists to buffer - chunks until enough data is available to fit in a single frame. - - TODO we'll need something like this when compression is supported. - So it might make sense to implement this functionality at the stream - level. - """ - - def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE): - self._stream = stream - self._requestid = requestid - self._maxsize = maxframesize - self._chunks = [] - self._chunkssize = 0 - - def send(self, data): - """Send new data for emission. - - Is a generator of new frames that were derived from the new input. - - If the special input ``None`` is received, flushes all buffered - data to frames. - """ - - if data is None: - for frame in self._flush(): - yield frame - return - - data = self._stream.encode(data) - - # There is a ton of potential to do more complicated things here. - # Our immediate goal is to coalesce small chunks into big frames, - # not achieve the fewest number of frames possible. So we go with - # a simple implementation: - # - # * If a chunk is too large for a frame, we flush and emit frames - # for the new chunk. - # * If a chunk can be buffered without total buffered size limits - # being exceeded, we do that. - # * If a chunk causes us to go over our buffering limit, we flush - # and then buffer the new chunk. - - if not data: - return - - if len(data) > self._maxsize: - for frame in self._flush(): - yield frame - - # Now emit frames for the big chunk. - offset = 0 - while True: - chunk = data[offset : offset + self._maxsize] - offset += len(chunk) - - yield self._stream.makeframe( - self._requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=FLAG_COMMAND_RESPONSE_CONTINUATION, - payload=chunk, - encoded=True, - ) - - if offset == len(data): - return - - # If we don't have enough to constitute a full frame, buffer and - # return. - if len(data) + self._chunkssize < self._maxsize: - self._chunks.append(data) - self._chunkssize += len(data) - return - - # Else flush what we have and buffer the new chunk. We could do - # something more intelligent here, like break the chunk. Let's - # keep things simple for now. - for frame in self._flush(): - yield frame - - self._chunks.append(data) - self._chunkssize = len(data) - - def _flush(self): - payload = b''.join(self._chunks) - assert len(payload) <= self._maxsize - - self._chunks[:] = [] - self._chunkssize = 0 - - if not payload: - return - - yield self._stream.makeframe( - self._requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=FLAG_COMMAND_RESPONSE_CONTINUATION, - payload=payload, - encoded=True, - ) - - -# TODO consider defining encoders/decoders using the util.compressionengine -# mechanism. - - -class identityencoder: - """Encoder for the "identity" stream encoding profile.""" - - def __init__(self, ui): - pass - - def encode(self, data): - return data - - def flush(self): - return b'' - - def finish(self): - return b'' - - -class identitydecoder: - """Decoder for the "identity" stream encoding profile.""" - - def __init__(self, ui, extraobjs): - if extraobjs: - raise error.Abort( - _(b'identity decoder received unexpected additional values') - ) - - def decode(self, data): - return data - - -class zlibencoder: - def __init__(self, ui): - import zlib - - self._zlib = zlib - self._compressor = zlib.compressobj() - - def encode(self, data): - return self._compressor.compress(data) - - def flush(self): - # Z_SYNC_FLUSH doesn't reset compression context, which is - # what we want. - return self._compressor.flush(self._zlib.Z_SYNC_FLUSH) - - def finish(self): - res = self._compressor.flush(self._zlib.Z_FINISH) - self._compressor = None - return res - - -class zlibdecoder: - def __init__(self, ui, extraobjs): - import zlib - - if extraobjs: - raise error.Abort( - _(b'zlib decoder received unexpected additional values') - ) - - self._decompressor = zlib.decompressobj() - - def decode(self, data): - return self._decompressor.decompress(data) - - -class zstdbaseencoder: - def __init__(self, level): - from . import zstd - - self._zstd = zstd - cctx = zstd.ZstdCompressor(level=level) - self._compressor = cctx.compressobj() - - def encode(self, data): - return self._compressor.compress(data) - - def flush(self): - # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the - # compressor and allows a decompressor to access all encoded data - # up to this point. - return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK) - - def finish(self): - res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH) - self._compressor = None - return res - - -class zstd8mbencoder(zstdbaseencoder): - def __init__(self, ui): - super(zstd8mbencoder, self).__init__(3) - - -class zstdbasedecoder: - def __init__(self, maxwindowsize): - from . import zstd - - dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize) - self._decompressor = dctx.decompressobj() - - def decode(self, data): - return self._decompressor.decompress(data) - - -class zstd8mbdecoder(zstdbasedecoder): - def __init__(self, ui, extraobjs): - if extraobjs: - raise error.Abort( - _(b'zstd8mb decoder received unexpected additional values') - ) - - super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576) - - -# We lazily populate this to avoid excessive module imports when importing -# this module. -STREAM_ENCODERS = {} -STREAM_ENCODERS_ORDER = [] - - -def populatestreamencoders(): - if STREAM_ENCODERS: - return - - try: - from . import zstd - - zstd.__version__ - except ImportError: - zstd = None - - # zstandard is fastest and is preferred. - if zstd: - STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder) - STREAM_ENCODERS_ORDER.append(b'zstd-8mb') - - STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder) - STREAM_ENCODERS_ORDER.append(b'zlib') - - STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder) - STREAM_ENCODERS_ORDER.append(b'identity') - - -class stream: - """Represents a logical unidirectional series of frames.""" - - def __init__(self, streamid, active=False): - self.streamid = streamid - self._active = active - - def makeframe(self, requestid, typeid, flags, payload): - """Create a frame to be sent out over this stream. - - Only returns the frame instance. Does not actually send it. - """ - streamflags = 0 - if not self._active: - streamflags |= STREAM_FLAG_BEGIN_STREAM - self._active = True - - return makeframe( - requestid, self.streamid, streamflags, typeid, flags, payload - ) - - -class inputstream(stream): - """Represents a stream used for receiving data.""" - - def __init__(self, streamid, active=False): - super(inputstream, self).__init__(streamid, active=active) - self._decoder = None - - def setdecoder(self, ui, name, extraobjs): - """Set the decoder for this stream. - - Receives the stream profile name and any additional CBOR objects - decoded from the stream encoding settings frame payloads. - """ - if name not in STREAM_ENCODERS: - raise error.Abort(_(b'unknown stream decoder: %s') % name) - - self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs) - - def decode(self, data): - # Default is identity decoder. We don't bother instantiating one - # because it is trivial. - if not self._decoder: - return data - - return self._decoder.decode(data) - - def flush(self): - if not self._decoder: - return b'' - - return self._decoder.flush() - - -class outputstream(stream): - """Represents a stream used for sending data.""" - - def __init__(self, streamid, active=False): - super(outputstream, self).__init__(streamid, active=active) - self.streamsettingssent = False - self._encoder = None - self._encodername = None - - def setencoder(self, ui, name): - """Set the encoder for this stream. - - Receives the stream profile name. - """ - if name not in STREAM_ENCODERS: - raise error.Abort(_(b'unknown stream encoder: %s') % name) - - self._encoder = STREAM_ENCODERS[name][0](ui) - self._encodername = name - - def encode(self, data): - if not self._encoder: - return data - - return self._encoder.encode(data) - - def flush(self): - if not self._encoder: - return b'' - - return self._encoder.flush() - - def finish(self): - if not self._encoder: - return b'' - - self._encoder.finish() - - def makeframe(self, requestid, typeid, flags, payload, encoded=False): - """Create a frame to be sent out over this stream. - - Only returns the frame instance. Does not actually send it. - """ - streamflags = 0 - if not self._active: - streamflags |= STREAM_FLAG_BEGIN_STREAM - self._active = True - - if encoded: - if not self.streamsettingssent: - raise error.ProgrammingError( - b'attempting to send encoded frame without sending stream ' - b'settings' - ) - - streamflags |= STREAM_FLAG_ENCODING_APPLIED - - if ( - typeid == FRAME_TYPE_STREAM_SETTINGS - and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS - ): - self.streamsettingssent = True - - return makeframe( - requestid, self.streamid, streamflags, typeid, flags, payload - ) - - def makestreamsettingsframe(self, requestid): - """Create a stream settings frame for this stream. - - Returns frame data or None if no stream settings frame is needed or has - already been sent. - """ - if not self._encoder or self.streamsettingssent: - return None - - payload = b''.join(cborutil.streamencode(self._encodername)) - return self.makeframe( - requestid, - FRAME_TYPE_STREAM_SETTINGS, - FLAG_STREAM_ENCODING_SETTINGS_EOS, - payload, - ) - - -def ensureserverstream(stream): - if stream.streamid % 2: - raise error.ProgrammingError( - b'server should only write to even ' - b'numbered streams; %d is not even' % stream.streamid - ) - - -DEFAULT_PROTOCOL_SETTINGS = { - b'contentencodings': [b'identity'], -} - - -class serverreactor: - """Holds state of a server handling frame-based protocol requests. - - This class is the "brain" of the unified frame-based protocol server - component. While the protocol is stateless from the perspective of - requests/commands, something needs to track which frames have been - received, what frames to expect, etc. This class is that thing. - - Instances are modeled as a state machine of sorts. Instances are also - reactionary to external events. The point of this class is to encapsulate - the state of the connection and the exchange of frames, not to perform - work. Instead, callers tell this class when something occurs, like a - frame arriving. If that activity is worthy of a follow-up action (say - *run a command*), the return value of that handler will say so. - - I/O and CPU intensive operations are purposefully delegated outside of - this class. - - Consumers are expected to tell instances when events occur. They do so by - calling the various ``on*`` methods. These methods return a 2-tuple - describing any follow-up action(s) to take. The first element is the - name of an action to perform. The second is a data structure (usually - a dict) specific to that action that contains more information. e.g. - if the server wants to send frames back to the client, the data structure - will contain a reference to those frames. - - Valid actions that consumers can be instructed to take are: - - sendframes - Indicates that frames should be sent to the client. The ``framegen`` - key contains a generator of frames that should be sent. The server - assumes that all frames are sent to the client. - - error - Indicates that an error occurred. Consumer should probably abort. - - runcommand - Indicates that the consumer should run a wire protocol command. Details - of the command to run are given in the data structure. - - wantframe - Indicates that nothing of interest happened and the server is waiting on - more frames from the client before anything interesting can be done. - - noop - Indicates no additional action is required. - - Known Issues - ------------ - - There are no limits to the number of partially received commands or their - size. A malicious client could stream command request data and exhaust the - server's memory. - - Partially received commands are not acted upon when end of input is - reached. Should the server error if it receives a partial request? - Should the client send a message to abort a partially transmitted request - to facilitate graceful shutdown? - - Active requests that haven't been responded to aren't tracked. This means - that if we receive a command and instruct its dispatch, another command - with its request ID can come in over the wire and there will be a race - between who responds to what. - """ - - def __init__(self, ui, deferoutput=False): - """Construct a new server reactor. - - ``deferoutput`` can be used to indicate that no output frames should be - instructed to be sent until input has been exhausted. In this mode, - events that would normally generate output frames (such as a command - response being ready) will instead defer instructing the consumer to - send those frames. This is useful for half-duplex transports where the - sender cannot receive until all data has been transmitted. - """ - self._ui = ui - self._deferoutput = deferoutput - self._state = b'initial' - self._nextoutgoingstreamid = 2 - self._bufferedframegens = [] - # stream id -> stream instance for all active streams from the client. - self._incomingstreams = {} - self._outgoingstreams = {} - # request id -> dict of commands that are actively being received. - self._receivingcommands = {} - # Request IDs that have been received and are actively being processed. - # Once all output for a request has been sent, it is removed from this - # set. - self._activecommands = set() - - self._protocolsettingsdecoder = None - - # Sender protocol settings are optional. Set implied default values. - self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS) - - populatestreamencoders() - - def onframerecv(self, frame): - """Process a frame that has been received off the wire. - - Returns a dict with an ``action`` key that details what action, - if any, the consumer should take next. - """ - if not frame.streamid % 2: - self._state = b'errored' - return self._makeerrorresult( - _(b'received frame with even numbered stream ID: %d') - % frame.streamid - ) - - if frame.streamid not in self._incomingstreams: - if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: - self._state = b'errored' - return self._makeerrorresult( - _( - b'received frame on unknown inactive stream without ' - b'beginning of stream flag set' - ) - ) - - self._incomingstreams[frame.streamid] = inputstream(frame.streamid) - - if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: - # TODO handle decoding frames - self._state = b'errored' - raise error.ProgrammingError( - b'support for decoding stream payloads not yet implemented' - ) - - if frame.streamflags & STREAM_FLAG_END_STREAM: - del self._incomingstreams[frame.streamid] - - handlers = { - b'initial': self._onframeinitial, - b'protocol-settings-receiving': self._onframeprotocolsettings, - b'idle': self._onframeidle, - b'command-receiving': self._onframecommandreceiving, - b'errored': self._onframeerrored, - } - - meth = handlers.get(self._state) - if not meth: - raise error.ProgrammingError(b'unhandled state: %s' % self._state) - - return meth(frame) - - def oncommandresponsereadyobjects(self, stream, requestid, objs): - """Signal that objects are ready to be sent to the client. - - ``objs`` is an iterable of objects (typically a generator) that will - be encoded via CBOR and added to frames, which will be sent to the - client. - """ - ensureserverstream(stream) - - # A more robust solution would be to check for objs.{next,__next__}. - if isinstance(objs, list): - objs = iter(objs) - - # We need to take care over exception handling. Uncaught exceptions - # when generating frames could lead to premature end of the frame - # stream and the possibility of the server or client process getting - # in a bad state. - # - # Keep in mind that if ``objs`` is a generator, advancing it could - # raise exceptions that originated in e.g. wire protocol command - # functions. That is why we differentiate between exceptions raised - # when iterating versus other exceptions that occur. - # - # In all cases, when the function finishes, the request is fully - # handled and no new frames for it should be seen. - - def sendframes(): - emitted = False - alternatelocationsent = False - emitter = bufferingcommandresponseemitter(stream, requestid) - while True: - try: - o = next(objs) - except StopIteration: - for frame in emitter.send(None): - yield frame - - if emitted: - for frame in createcommandresponseeosframes( - stream, requestid - ): - yield frame - break - - except error.WireprotoCommandError as e: - for frame in createcommanderrorresponse( - stream, requestid, e.message, e.messageargs - ): - yield frame - break - - except Exception as e: - for frame in createerrorframe( - stream, - requestid, - b'%s' % stringutil.forcebytestr(e), - errtype=b'server', - ): - - yield frame - - break - - try: - # Alternate location responses can only be the first and - # only object in the output stream. - if isinstance(o, wireprototypes.alternatelocationresponse): - if emitted: - raise error.ProgrammingError( - b'alternatelocationresponse seen after initial ' - b'output object' - ) - - frame = stream.makestreamsettingsframe(requestid) - if frame: - yield frame - - yield createalternatelocationresponseframe( - stream, requestid, o - ) - - alternatelocationsent = True - emitted = True - continue - - if alternatelocationsent: - raise error.ProgrammingError( - b'object follows alternatelocationresponse' - ) - - if not emitted: - # Frame is optional. - frame = stream.makestreamsettingsframe(requestid) - if frame: - yield frame - - # May be None if empty frame (due to encoding). - frame = createcommandresponseokframe(stream, requestid) - if frame: - yield frame - - emitted = True - - # Objects emitted by command functions can be serializable - # data structures or special types. - # TODO consider extracting the content normalization to a - # standalone function, as it may be useful for e.g. cachers. - - # A pre-encoded object is sent directly to the emitter. - if isinstance(o, wireprototypes.encodedresponse): - for frame in emitter.send(o.data): - yield frame - - elif isinstance( - o, wireprototypes.indefinitebytestringresponse - ): - for chunk in cborutil.streamencodebytestringfromiter( - o.chunks - ): - - for frame in emitter.send(chunk): - yield frame - - # A regular object is CBOR encoded. - else: - for chunk in cborutil.streamencode(o): - for frame in emitter.send(chunk): - yield frame - - except Exception as e: - for frame in createerrorframe( - stream, requestid, b'%s' % e, errtype=b'server' - ): - yield frame - - break - - self._activecommands.remove(requestid) - - return self._handlesendframes(sendframes()) - - def oninputeof(self): - """Signals that end of input has been received. - - No more frames will be received. All pending activity should be - completed. - """ - # TODO should we do anything about in-flight commands? - - if not self._deferoutput or not self._bufferedframegens: - return b'noop', {} - - # If we buffered all our responses, emit those. - def makegen(): - for gen in self._bufferedframegens: - for frame in gen: - yield frame - - return b'sendframes', { - b'framegen': makegen(), - } - - def _handlesendframes(self, framegen): - if self._deferoutput: - self._bufferedframegens.append(framegen) - return b'noop', {} - else: - return b'sendframes', { - b'framegen': framegen, - } - - def onservererror(self, stream, requestid, msg): - ensureserverstream(stream) - - def sendframes(): - for frame in createerrorframe( - stream, requestid, msg, errtype=b'server' - ): - yield frame - - self._activecommands.remove(requestid) - - return self._handlesendframes(sendframes()) - - def oncommanderror(self, stream, requestid, message, args=None): - """Called when a command encountered an error before sending output.""" - ensureserverstream(stream) - - def sendframes(): - for frame in createcommanderrorresponse( - stream, requestid, message, args - ): - yield frame - - self._activecommands.remove(requestid) - - return self._handlesendframes(sendframes()) - - def makeoutputstream(self): - """Create a stream to be used for sending data to the client. - - If this is called before protocol settings frames are received, we - don't know what stream encodings are supported by the client and - we will default to identity. - """ - streamid = self._nextoutgoingstreamid - self._nextoutgoingstreamid += 2 - - s = outputstream(streamid) - self._outgoingstreams[streamid] = s - - # Always use the *server's* preferred encoder over the client's, - # as servers have more to lose from sub-optimal encoders being used. - for name in STREAM_ENCODERS_ORDER: - if name in self._sendersettings[b'contentencodings']: - s.setencoder(self._ui, name) - break - - return s - - def _makeerrorresult(self, msg): - return b'error', { - b'message': msg, - } - - def _makeruncommandresult(self, requestid): - entry = self._receivingcommands[requestid] - - if not entry[b'requestdone']: - self._state = b'errored' - raise error.ProgrammingError( - b'should not be called without requestdone set' - ) - - del self._receivingcommands[requestid] - - if self._receivingcommands: - self._state = b'command-receiving' - else: - self._state = b'idle' - - # Decode the payloads as CBOR. - entry[b'payload'].seek(0) - request = cborutil.decodeall(entry[b'payload'].getvalue())[0] - - if b'name' not in request: - self._state = b'errored' - return self._makeerrorresult( - _(b'command request missing "name" field') - ) - - if b'args' not in request: - request[b'args'] = {} - - assert requestid not in self._activecommands - self._activecommands.add(requestid) - - return ( - b'runcommand', - { - b'requestid': requestid, - b'command': request[b'name'], - b'args': request[b'args'], - b'redirect': request.get(b'redirect'), - b'data': entry[b'data'].getvalue() if entry[b'data'] else None, - }, - ) - - def _makewantframeresult(self): - return b'wantframe', { - b'state': self._state, - } - - def _validatecommandrequestframe(self, frame): - new = frame.flags & FLAG_COMMAND_REQUEST_NEW - continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION - - if new and continuation: - self._state = b'errored' - return self._makeerrorresult( - _( - b'received command request frame with both new and ' - b'continuation flags set' - ) - ) - - if not new and not continuation: - self._state = b'errored' - return self._makeerrorresult( - _( - b'received command request frame with neither new nor ' - b'continuation flags set' - ) - ) - - def _onframeinitial(self, frame): - # Called when we receive a frame when in the "initial" state. - if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: - self._state = b'protocol-settings-receiving' - self._protocolsettingsdecoder = cborutil.bufferingdecoder() - return self._onframeprotocolsettings(frame) - - elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST: - self._state = b'idle' - return self._onframeidle(frame) - - else: - self._state = b'errored' - return self._makeerrorresult( - _( - b'expected sender protocol settings or command request ' - b'frame; got %d' - ) - % frame.typeid - ) - - def _onframeprotocolsettings(self, frame): - assert self._state == b'protocol-settings-receiving' - assert self._protocolsettingsdecoder is not None - - if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: - self._state = b'errored' - return self._makeerrorresult( - _(b'expected sender protocol settings frame; got %d') - % frame.typeid - ) - - more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION - eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS - - if more and eos: - self._state = b'errored' - return self._makeerrorresult( - _( - b'sender protocol settings frame cannot have both ' - b'continuation and end of stream flags set' - ) - ) - - if not more and not eos: - self._state = b'errored' - return self._makeerrorresult( - _( - b'sender protocol settings frame must have continuation or ' - b'end of stream flag set' - ) - ) - - # TODO establish limits for maximum amount of data that can be - # buffered. - try: - self._protocolsettingsdecoder.decode(frame.payload) - except Exception as e: - self._state = b'errored' - return self._makeerrorresult( - _( - b'error decoding CBOR from sender protocol settings frame: %s' - ) - % stringutil.forcebytestr(e) - ) - - if more: - return self._makewantframeresult() - - assert eos - - decoded = self._protocolsettingsdecoder.getavailable() - self._protocolsettingsdecoder = None - - if not decoded: - self._state = b'errored' - return self._makeerrorresult( - _(b'sender protocol settings frame did not contain CBOR data') - ) - elif len(decoded) > 1: - self._state = b'errored' - return self._makeerrorresult( - _( - b'sender protocol settings frame contained multiple CBOR ' - b'values' - ) - ) - - d = decoded[0] - - if b'contentencodings' in d: - self._sendersettings[b'contentencodings'] = d[b'contentencodings'] - - self._state = b'idle' - - return self._makewantframeresult() - - def _onframeidle(self, frame): - # The only frame type that should be received in this state is a - # command request. - if frame.typeid != FRAME_TYPE_COMMAND_REQUEST: - self._state = b'errored' - return self._makeerrorresult( - _(b'expected command request frame; got %d') % frame.typeid - ) - - res = self._validatecommandrequestframe(frame) - if res: - return res - - if frame.requestid in self._receivingcommands: - self._state = b'errored' - return self._makeerrorresult( - _(b'request with ID %d already received') % frame.requestid - ) - - if frame.requestid in self._activecommands: - self._state = b'errored' - return self._makeerrorresult( - _(b'request with ID %d is already active') % frame.requestid - ) - - new = frame.flags & FLAG_COMMAND_REQUEST_NEW - moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES - expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA - - if not new: - self._state = b'errored' - return self._makeerrorresult( - _(b'received command request frame without new flag set') - ) - - payload = util.bytesio() - payload.write(frame.payload) - - self._receivingcommands[frame.requestid] = { - b'payload': payload, - b'data': None, - b'requestdone': not moreframes, - b'expectingdata': bool(expectingdata), - } - - # This is the final frame for this request. Dispatch it. - if not moreframes and not expectingdata: - return self._makeruncommandresult(frame.requestid) - - assert moreframes or expectingdata - self._state = b'command-receiving' - return self._makewantframeresult() - - def _onframecommandreceiving(self, frame): - if frame.typeid == FRAME_TYPE_COMMAND_REQUEST: - # Process new command requests as such. - if frame.flags & FLAG_COMMAND_REQUEST_NEW: - return self._onframeidle(frame) - - res = self._validatecommandrequestframe(frame) - if res: - return res - - # All other frames should be related to a command that is currently - # receiving but is not active. - if frame.requestid in self._activecommands: - self._state = b'errored' - return self._makeerrorresult( - _(b'received frame for request that is still active: %d') - % frame.requestid - ) - - if frame.requestid not in self._receivingcommands: - self._state = b'errored' - return self._makeerrorresult( - _(b'received frame for request that is not receiving: %d') - % frame.requestid - ) - - entry = self._receivingcommands[frame.requestid] - - if frame.typeid == FRAME_TYPE_COMMAND_REQUEST: - moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES - expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA) - - if entry[b'requestdone']: - self._state = b'errored' - return self._makeerrorresult( - _( - b'received command request frame when request frames ' - b'were supposedly done' - ) - ) - - if expectingdata != entry[b'expectingdata']: - self._state = b'errored' - return self._makeerrorresult( - _(b'mismatch between expect data flag and previous frame') - ) - - entry[b'payload'].write(frame.payload) - - if not moreframes: - entry[b'requestdone'] = True - - if not moreframes and not expectingdata: - return self._makeruncommandresult(frame.requestid) - - return self._makewantframeresult() - - elif frame.typeid == FRAME_TYPE_COMMAND_DATA: - if not entry[b'expectingdata']: - self._state = b'errored' - return self._makeerrorresult( - _( - b'received command data frame for request that is not ' - b'expecting data: %d' - ) - % frame.requestid - ) - - if entry[b'data'] is None: - entry[b'data'] = util.bytesio() - - return self._handlecommanddataframe(frame, entry) - else: - self._state = b'errored' - return self._makeerrorresult( - _(b'received unexpected frame type: %d') % frame.typeid - ) - - def _handlecommanddataframe(self, frame, entry): - assert frame.typeid == FRAME_TYPE_COMMAND_DATA - - # TODO support streaming data instead of buffering it. - entry[b'data'].write(frame.payload) - - if frame.flags & FLAG_COMMAND_DATA_CONTINUATION: - return self._makewantframeresult() - elif frame.flags & FLAG_COMMAND_DATA_EOS: - entry[b'data'].seek(0) - return self._makeruncommandresult(frame.requestid) - else: - self._state = b'errored' - return self._makeerrorresult(_(b'command data frame without flags')) - - def _onframeerrored(self, frame): - return self._makeerrorresult(_(b'server already errored')) - - -class commandrequest: - """Represents a request to run a command.""" - - def __init__(self, requestid, name, args, datafh=None, redirect=None): - self.requestid = requestid - self.name = name - self.args = args - self.datafh = datafh - self.redirect = redirect - self.state = b'pending' - - -class clientreactor: - """Holds state of a client issuing frame-based protocol requests. - - This is like ``serverreactor`` but for client-side state. - - Each instance is bound to the lifetime of a connection. For persistent - connection transports using e.g. TCP sockets and speaking the raw - framing protocol, there will be a single instance for the lifetime of - the TCP socket. For transports where there are multiple discrete - interactions (say tunneled within in HTTP request), there will be a - separate instance for each distinct interaction. - - Consumers are expected to tell instances when events occur by calling - various methods. These methods return a 2-tuple describing any follow-up - action(s) to take. The first element is the name of an action to - perform. The second is a data structure (usually a dict) specific to - that action that contains more information. e.g. if the reactor wants - to send frames to the server, the data structure will contain a reference - to those frames. - - Valid actions that consumers can be instructed to take are: - - noop - Indicates no additional action is required. - - sendframes - Indicates that frames should be sent to the server. The ``framegen`` - key contains a generator of frames that should be sent. The reactor - assumes that all frames in this generator are sent to the server. - - error - Indicates that an error occurred. The ``message`` key contains an - error message describing the failure. - - responsedata - Indicates a response to a previously-issued command was received. - - The ``request`` key contains the ``commandrequest`` instance that - represents the request this data is for. - - The ``data`` key contains the decoded data from the server. - - ``expectmore`` and ``eos`` evaluate to True when more response data - is expected to follow or we're at the end of the response stream, - respectively. - """ - - def __init__( - self, - ui, - hasmultiplesend=False, - buffersends=True, - clientcontentencoders=None, - ): - """Create a new instance. - - ``hasmultiplesend`` indicates whether multiple sends are supported - by the transport. When True, it is possible to send commands immediately - instead of buffering until the caller signals an intent to finish a - send operation. - - ``buffercommands`` indicates whether sends should be buffered until the - last request has been issued. - - ``clientcontentencoders`` is an iterable of content encoders the client - will advertise to the server and that the server can use for encoding - data. If not defined, the client will not advertise content encoders - to the server. - """ - self._ui = ui - self._hasmultiplesend = hasmultiplesend - self._buffersends = buffersends - self._clientcontentencoders = clientcontentencoders - - self._canissuecommands = True - self._cansend = True - self._protocolsettingssent = False - - self._nextrequestid = 1 - # We only support a single outgoing stream for now. - self._outgoingstream = outputstream(1) - self._pendingrequests = collections.deque() - self._activerequests = {} - self._incomingstreams = {} - self._streamsettingsdecoders = {} - - populatestreamencoders() - - def callcommand(self, name, args, datafh=None, redirect=None): - """Request that a command be executed. - - Receives the command name, a dict of arguments to pass to the command, - and an optional file object containing the raw data for the command. - - Returns a 3-tuple of (request, action, action data). - """ - if not self._canissuecommands: - raise error.ProgrammingError(b'cannot issue new commands') - - requestid = self._nextrequestid - self._nextrequestid += 2 - - request = commandrequest( - requestid, name, args, datafh=datafh, redirect=redirect - ) - - if self._buffersends: - self._pendingrequests.append(request) - return request, b'noop', {} - else: - if not self._cansend: - raise error.ProgrammingError( - b'sends cannot be performed on this instance' - ) - - if not self._hasmultiplesend: - self._cansend = False - self._canissuecommands = False - - return ( - request, - b'sendframes', - { - b'framegen': self._makecommandframes(request), - }, - ) - - def flushcommands(self): - """Request that all queued commands be sent. - - If any commands are buffered, this will instruct the caller to send - them over the wire. If no commands are buffered it instructs the client - to no-op. - - If instances aren't configured for multiple sends, no new command - requests are allowed after this is called. - """ - if not self._pendingrequests: - return b'noop', {} - - if not self._cansend: - raise error.ProgrammingError( - b'sends cannot be performed on this instance' - ) - - # If the instance only allows sending once, mark that we have fired - # our one shot. - if not self._hasmultiplesend: - self._canissuecommands = False - self._cansend = False - - def makeframes(): - while self._pendingrequests: - request = self._pendingrequests.popleft() - for frame in self._makecommandframes(request): - yield frame - - return b'sendframes', { - b'framegen': makeframes(), - } - - def _makecommandframes(self, request): - """Emit frames to issue a command request. - - As a side-effect, update request accounting to reflect its changed - state. - """ - self._activerequests[request.requestid] = request - request.state = b'sending' - - if not self._protocolsettingssent and self._clientcontentencoders: - self._protocolsettingssent = True - - payload = b''.join( - cborutil.streamencode( - { - b'contentencodings': self._clientcontentencoders, - } - ) - ) - - yield self._outgoingstream.makeframe( - requestid=request.requestid, - typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS, - flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS, - payload=payload, - ) - - res = createcommandframes( - self._outgoingstream, - request.requestid, - request.name, - request.args, - datafh=request.datafh, - redirect=request.redirect, - ) - - for frame in res: - yield frame - - request.state = b'sent' - - def onframerecv(self, frame): - """Process a frame that has been received off the wire. - - Returns a 2-tuple of (action, meta) describing further action the - caller needs to take as a result of receiving this frame. - """ - if frame.streamid % 2: - return ( - b'error', - { - b'message': ( - _(b'received frame with odd numbered stream ID: %d') - % frame.streamid - ), - }, - ) - - if frame.streamid not in self._incomingstreams: - if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: - return ( - b'error', - { - b'message': _( - b'received frame on unknown stream ' - b'without beginning of stream flag set' - ), - }, - ) - - self._incomingstreams[frame.streamid] = inputstream(frame.streamid) - - stream = self._incomingstreams[frame.streamid] - - # If the payload is encoded, ask the stream to decode it. We - # merely substitute the decoded result into the frame payload as - # if it had been transferred all along. - if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: - frame.payload = stream.decode(frame.payload) - - if frame.streamflags & STREAM_FLAG_END_STREAM: - del self._incomingstreams[frame.streamid] - - if frame.typeid == FRAME_TYPE_STREAM_SETTINGS: - return self._onstreamsettingsframe(frame) - - if frame.requestid not in self._activerequests: - return ( - b'error', - { - b'message': ( - _(b'received frame for inactive request ID: %d') - % frame.requestid - ), - }, - ) - - request = self._activerequests[frame.requestid] - request.state = b'receiving' - - handlers = { - FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe, - FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe, - } - - meth = handlers.get(frame.typeid) - if not meth: - raise error.ProgrammingError( - b'unhandled frame type: %d' % frame.typeid - ) - - return meth(request, frame) - - def _onstreamsettingsframe(self, frame): - assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS - - more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION - eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS - - if more and eos: - return ( - b'error', - { - b'message': ( - _( - b'stream encoding settings frame cannot have both ' - b'continuation and end of stream flags set' - ) - ), - }, - ) - - if not more and not eos: - return ( - b'error', - { - b'message': _( - b'stream encoding settings frame must have ' - b'continuation or end of stream flag set' - ), - }, - ) - - if frame.streamid not in self._streamsettingsdecoders: - decoder = cborutil.bufferingdecoder() - self._streamsettingsdecoders[frame.streamid] = decoder - - decoder = self._streamsettingsdecoders[frame.streamid] - - try: - decoder.decode(frame.payload) - except Exception as e: - return ( - b'error', - { - b'message': ( - _( - b'error decoding CBOR from stream encoding ' - b'settings frame: %s' - ) - % stringutil.forcebytestr(e) - ), - }, - ) - - if more: - return b'noop', {} - - assert eos - - decoded = decoder.getavailable() - del self._streamsettingsdecoders[frame.streamid] - - if not decoded: - return ( - b'error', - { - b'message': _( - b'stream encoding settings frame did not contain ' - b'CBOR data' - ), - }, - ) - - try: - self._incomingstreams[frame.streamid].setdecoder( - self._ui, decoded[0], decoded[1:] - ) - except Exception as e: - return ( - b'error', - { - b'message': ( - _(b'error setting stream decoder: %s') - % stringutil.forcebytestr(e) - ), - }, - ) - - return b'noop', {} - - def _oncommandresponseframe(self, request, frame): - if frame.flags & FLAG_COMMAND_RESPONSE_EOS: - request.state = b'received' - del self._activerequests[request.requestid] - - return ( - b'responsedata', - { - b'request': request, - b'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION, - b'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS, - b'data': frame.payload, - }, - ) - - def _onerrorresponseframe(self, request, frame): - request.state = b'errored' - del self._activerequests[request.requestid] - - # The payload should be a CBOR map. - m = cborutil.decodeall(frame.payload)[0] - - return ( - b'error', - { - b'request': request, - b'type': m[b'type'], - b'message': m[b'message'], - }, - ) diff --git a/tests/test-check-pytype.t b/tests/test-check-pytype.t --- a/tests/test-check-pytype.t +++ b/tests/test-check-pytype.t @@ -32,7 +32,6 @@ mercurial/unionrepo.py # ui, svfs, unfiltered [attribute-error] mercurial/utils/memorytop.py # not 3.6 compatible mercurial/win32.py # [not-callable] -mercurial/wireprotoframing.py # [unsupported-operands], [attribute-error], [import-error] mercurial/wireprotov1peer.py # [attribute-error] mercurial/wireprotov1server.py # BUG?: BundleValueError handler accesses subclass's attrs @@ -64,7 +63,6 @@ > -x mercurial/unionrepo.py \ > -x mercurial/utils/memorytop.py \ > -x mercurial/win32.py \ - > -x mercurial/wireprotoframing.py \ > -x mercurial/wireprotov1peer.py \ > -x mercurial/wireprotov1server.py \ > > $TESTTMP/pytype-output.txt || cat $TESTTMP/pytype-output.txt diff --git a/tests/test-wireproto-clientreactor.py b/tests/test-wireproto-clientreactor.py deleted file mode 100644 --- a/tests/test-wireproto-clientreactor.py +++ /dev/null @@ -1,765 +0,0 @@ -import sys -import unittest -import zlib - -from mercurial import ( - error, - ui as uimod, - wireprotoframing as framing, -) -from mercurial.utils import cborutil - -try: - from mercurial import zstd - - zstd.__version__ -except ImportError: - zstd = None - -ffs = framing.makeframefromhumanstring - -globalui = uimod.ui() - - -def sendframe(reactor, frame): - """Send a frame bytearray to a reactor.""" - header = framing.parseheader(frame) - payload = frame[framing.FRAME_HEADER_SIZE :] - assert len(payload) == header.length - - return reactor.onframerecv( - framing.frame( - header.requestid, - header.streamid, - header.streamflags, - header.typeid, - header.flags, - payload, - ) - ) - - -class SingleSendTests(unittest.TestCase): - """A reactor that can only send once rejects subsequent sends.""" - - if not getattr(unittest.TestCase, 'assertRaisesRegex', False): - # Python 3.7 deprecates the regex*p* version, but 2.7 lacks - # the regex version. - assertRaisesRegex = ( # camelcase-required - unittest.TestCase.assertRaisesRegexp - ) - - def testbasic(self): - reactor = framing.clientreactor( - globalui, hasmultiplesend=False, buffersends=True - ) - - request, action, meta = reactor.callcommand(b'foo', {}) - self.assertEqual(request.state, b'pending') - self.assertEqual(action, b'noop') - - action, meta = reactor.flushcommands() - self.assertEqual(action, b'sendframes') - - for frame in meta[b'framegen']: - self.assertEqual(request.state, b'sending') - - self.assertEqual(request.state, b'sent') - - with self.assertRaisesRegex( - error.ProgrammingError, 'cannot issue new commands' - ): - reactor.callcommand(b'foo', {}) - - with self.assertRaisesRegex( - error.ProgrammingError, 'cannot issue new commands' - ): - reactor.callcommand(b'foo', {}) - - -class NoBufferTests(unittest.TestCase): - """A reactor without send buffering sends requests immediately.""" - - def testbasic(self): - reactor = framing.clientreactor( - globalui, hasmultiplesend=True, buffersends=False - ) - - request, action, meta = reactor.callcommand(b'command1', {}) - self.assertEqual(request.requestid, 1) - self.assertEqual(action, b'sendframes') - - self.assertEqual(request.state, b'pending') - - for frame in meta[b'framegen']: - self.assertEqual(request.state, b'sending') - - self.assertEqual(request.state, b'sent') - - action, meta = reactor.flushcommands() - self.assertEqual(action, b'noop') - - # And we can send another command. - request, action, meta = reactor.callcommand(b'command2', {}) - self.assertEqual(request.requestid, 3) - self.assertEqual(action, b'sendframes') - - for frame in meta[b'framegen']: - self.assertEqual(request.state, b'sending') - - self.assertEqual(request.state, b'sent') - - -class BadFrameRecvTests(unittest.TestCase): - if not getattr(unittest.TestCase, 'assertRaisesRegex', False): - # Python 3.7 deprecates the regex*p* version, but 2.7 lacks - # the regex version. - assertRaisesRegex = ( # camelcase-required - unittest.TestCase.assertRaisesRegexp - ) - - def testoddstream(self): - reactor = framing.clientreactor(globalui) - - action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo')) - self.assertEqual(action, b'error') - self.assertEqual( - meta[b'message'], b'received frame with odd numbered stream ID: 1' - ) - - def testunknownstream(self): - reactor = framing.clientreactor(globalui) - - action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo')) - self.assertEqual(action, b'error') - self.assertEqual( - meta[b'message'], - b'received frame on unknown stream without beginning ' - b'of stream flag set', - ) - - def testunhandledframetype(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for frame in meta[b'framegen']: - pass - - with self.assertRaisesRegex( - error.ProgrammingError, 'unhandled frame type' - ): - sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo')) - - -class StreamTests(unittest.TestCase): - def testmultipleresponseframes(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - - self.assertEqual(action, b'sendframes') - for f in meta[b'framegen']: - pass - - action, meta = sendframe( - reactor, - ffs( - b'%d 0 stream-begin command-response 0 foo' % request.requestid - ), - ) - self.assertEqual(action, b'responsedata') - - action, meta = sendframe( - reactor, ffs(b'%d 0 0 command-response eos bar' % request.requestid) - ) - self.assertEqual(action, b'responsedata') - - -class RedirectTests(unittest.TestCase): - def testredirect(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - redirect = { - b'targets': [b'a', b'b'], - b'hashes': [b'sha256'], - } - - request, action, meta = reactor.callcommand( - b'foo', {}, redirect=redirect - ) - - self.assertEqual(action, b'sendframes') - - frames = list(meta[b'framegen']) - self.assertEqual(len(frames), 1) - - self.assertEqual( - frames[0], - ffs( - b'1 1 stream-begin command-request new ' - b"cbor:{b'name': b'foo', " - b"b'redirect': {b'targets': [b'a', b'b'], " - b"b'hashes': [b'sha256']}}" - ), - ) - - -class StreamSettingsTests(unittest.TestCase): - def testnoflags(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - action, meta = sendframe( - reactor, ffs(b'1 2 stream-begin stream-settings 0 ') - ) - - self.assertEqual(action, b'error') - self.assertEqual( - meta, - { - b'message': b'stream encoding settings frame must have ' - b'continuation or end of stream flag set', - }, - ) - - def testconflictflags(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - action, meta = sendframe( - reactor, ffs(b'1 2 stream-begin stream-settings continuation|eos ') - ) - - self.assertEqual(action, b'error') - self.assertEqual( - meta, - { - b'message': b'stream encoding settings frame cannot have both ' - b'continuation and end of stream flags set', - }, - ) - - def testemptypayload(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - action, meta = sendframe( - reactor, ffs(b'1 2 stream-begin stream-settings eos ') - ) - - self.assertEqual(action, b'error') - self.assertEqual( - meta, - { - b'message': b'stream encoding settings frame did not contain ' - b'CBOR data' - }, - ) - - def testbadcbor(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - action, meta = sendframe( - reactor, ffs(b'1 2 stream-begin stream-settings eos badvalue') - ) - - self.assertEqual(action, b'error') - - def testsingleobject(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - action, meta = sendframe( - reactor, - ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'), - ) - - self.assertEqual(action, b'noop') - self.assertEqual(meta, {}) - - def testmultipleobjects(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - data = b''.join( - [ - b''.join(cborutil.streamencode(b'identity')), - b''.join(cborutil.streamencode({b'foo', b'bar'})), - ] - ) - - action, meta = sendframe( - reactor, ffs(b'1 2 stream-begin stream-settings eos %s' % data) - ) - - self.assertEqual(action, b'error') - self.assertEqual( - meta, - { - b'message': b'error setting stream decoder: identity decoder ' - b'received unexpected additional values', - }, - ) - - def testmultipleframes(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - data = b''.join(cborutil.streamencode(b'identity')) - - action, meta = sendframe( - reactor, - ffs( - b'1 2 stream-begin stream-settings continuation %s' % data[0:3] - ), - ) - - self.assertEqual(action, b'noop') - self.assertEqual(meta, {}) - - action, meta = sendframe( - reactor, ffs(b'1 2 0 stream-settings eos %s' % data[3:]) - ) - - self.assertEqual(action, b'noop') - self.assertEqual(meta, {}) - - def testinvalidencoder(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - action, meta = sendframe( - reactor, - ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'), - ) - - self.assertEqual(action, b'error') - self.assertEqual( - meta, - { - b'message': b'error setting stream decoder: unknown stream ' - b'decoder: badvalue', - }, - ) - - def testzlibencoding(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - action, meta = sendframe( - reactor, - ffs( - b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' - % request.requestid - ), - ) - - self.assertEqual(action, b'noop') - self.assertEqual(meta, {}) - - result = { - b'status': b'ok', - } - encoded = b''.join(cborutil.streamencode(result)) - - compressed = zlib.compress(encoded) - self.assertEqual(zlib.decompress(compressed), encoded) - - action, meta = sendframe( - reactor, - ffs( - b'%d 2 encoded command-response eos %s' - % (request.requestid, compressed) - ), - ) - - self.assertEqual(action, b'responsedata') - self.assertEqual(meta[b'data'], encoded) - - def testzlibencodingsinglebyteframes(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - action, meta = sendframe( - reactor, - ffs( - b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' - % request.requestid - ), - ) - - self.assertEqual(action, b'noop') - self.assertEqual(meta, {}) - - result = { - b'status': b'ok', - } - encoded = b''.join(cborutil.streamencode(result)) - - compressed = zlib.compress(encoded) - self.assertEqual(zlib.decompress(compressed), encoded) - - chunks = [] - - for i in range(len(compressed)): - char = compressed[i : i + 1] - if char == b'\\': - char = b'\\\\' - action, meta = sendframe( - reactor, - ffs( - b'%d 2 encoded command-response continuation %s' - % (request.requestid, char) - ), - ) - - self.assertEqual(action, b'responsedata') - chunks.append(meta[b'data']) - self.assertTrue(meta[b'expectmore']) - self.assertFalse(meta[b'eos']) - - # zlib will have the full data decoded at this point, even though - # we haven't flushed. - self.assertEqual(b''.join(chunks), encoded) - - # End the stream for good measure. - action, meta = sendframe( - reactor, - ffs(b'%d 2 stream-end command-response eos ' % request.requestid), - ) - - self.assertEqual(action, b'responsedata') - self.assertEqual(meta[b'data'], b'') - self.assertFalse(meta[b'expectmore']) - self.assertTrue(meta[b'eos']) - - def testzlibmultipleresponses(self): - # We feed in zlib compressed data on the same stream but belonging to - # 2 different requests. This tests our flushing behavior. - reactor = framing.clientreactor( - globalui, buffersends=False, hasmultiplesend=True - ) - - request1, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - request2, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - outstream = framing.outputstream(2) - outstream.setencoder(globalui, b'zlib') - - response1 = b''.join( - cborutil.streamencode( - { - b'status': b'ok', - b'extra': b'response1' * 10, - } - ) - ) - - response2 = b''.join( - cborutil.streamencode( - { - b'status': b'error', - b'extra': b'response2' * 10, - } - ) - ) - - action, meta = sendframe( - reactor, - ffs( - b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' - % request1.requestid - ), - ) - - self.assertEqual(action, b'noop') - self.assertEqual(meta, {}) - - # Feeding partial data in won't get anything useful out. - action, meta = sendframe( - reactor, - ffs( - b'%d 2 encoded command-response continuation %s' - % (request1.requestid, outstream.encode(response1)) - ), - ) - self.assertEqual(action, b'responsedata') - self.assertEqual(meta[b'data'], b'') - - # But flushing data at both ends will get our original data. - action, meta = sendframe( - reactor, - ffs( - b'%d 2 encoded command-response eos %s' - % (request1.requestid, outstream.flush()) - ), - ) - self.assertEqual(action, b'responsedata') - self.assertEqual(meta[b'data'], response1) - - # We should be able to reuse the compressor/decompressor for the - # 2nd response. - action, meta = sendframe( - reactor, - ffs( - b'%d 2 encoded command-response continuation %s' - % (request2.requestid, outstream.encode(response2)) - ), - ) - self.assertEqual(action, b'responsedata') - self.assertEqual(meta[b'data'], b'') - - action, meta = sendframe( - reactor, - ffs( - b'%d 2 encoded command-response eos %s' - % (request2.requestid, outstream.flush()) - ), - ) - self.assertEqual(action, b'responsedata') - self.assertEqual(meta[b'data'], response2) - - @unittest.skipUnless(zstd, 'zstd not available') - def testzstd8mbencoding(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - action, meta = sendframe( - reactor, - ffs( - b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' - % request.requestid - ), - ) - - self.assertEqual(action, b'noop') - self.assertEqual(meta, {}) - - result = { - b'status': b'ok', - } - encoded = b''.join(cborutil.streamencode(result)) - - encoder = framing.zstd8mbencoder(globalui) - compressed = encoder.encode(encoded) + encoder.finish() - self.assertEqual( - zstd.ZstdDecompressor().decompress( - compressed, max_output_size=len(encoded) - ), - encoded, - ) - - action, meta = sendframe( - reactor, - ffs( - b'%d 2 encoded command-response eos %s' - % (request.requestid, compressed) - ), - ) - - self.assertEqual(action, b'responsedata') - self.assertEqual(meta[b'data'], encoded) - - @unittest.skipUnless(zstd, 'zstd not available') - def testzstd8mbencodingsinglebyteframes(self): - reactor = framing.clientreactor(globalui, buffersends=False) - - request, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - action, meta = sendframe( - reactor, - ffs( - b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' - % request.requestid - ), - ) - - self.assertEqual(action, b'noop') - self.assertEqual(meta, {}) - - result = { - b'status': b'ok', - } - encoded = b''.join(cborutil.streamencode(result)) - - compressed = zstd.ZstdCompressor().compress(encoded) - self.assertEqual( - zstd.ZstdDecompressor().decompress(compressed), encoded - ) - - chunks = [] - - for i in range(len(compressed)): - char = compressed[i : i + 1] - if char == b'\\': - char = b'\\\\' - action, meta = sendframe( - reactor, - ffs( - b'%d 2 encoded command-response continuation %s' - % (request.requestid, char) - ), - ) - - self.assertEqual(action, b'responsedata') - chunks.append(meta[b'data']) - self.assertTrue(meta[b'expectmore']) - self.assertFalse(meta[b'eos']) - - # zstd decompressor will flush at frame boundaries. - self.assertEqual(b''.join(chunks), encoded) - - # End the stream for good measure. - action, meta = sendframe( - reactor, - ffs(b'%d 2 stream-end command-response eos ' % request.requestid), - ) - - self.assertEqual(action, b'responsedata') - self.assertEqual(meta[b'data'], b'') - self.assertFalse(meta[b'expectmore']) - self.assertTrue(meta[b'eos']) - - @unittest.skipUnless(zstd, 'zstd not available') - def testzstd8mbmultipleresponses(self): - # We feed in zstd compressed data on the same stream but belonging to - # 2 different requests. This tests our flushing behavior. - reactor = framing.clientreactor( - globalui, buffersends=False, hasmultiplesend=True - ) - - request1, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - request2, action, meta = reactor.callcommand(b'foo', {}) - for f in meta[b'framegen']: - pass - - outstream = framing.outputstream(2) - outstream.setencoder(globalui, b'zstd-8mb') - - response1 = b''.join( - cborutil.streamencode( - { - b'status': b'ok', - b'extra': b'response1' * 10, - } - ) - ) - - response2 = b''.join( - cborutil.streamencode( - { - b'status': b'error', - b'extra': b'response2' * 10, - } - ) - ) - - action, meta = sendframe( - reactor, - ffs( - b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' - % request1.requestid - ), - ) - - self.assertEqual(action, b'noop') - self.assertEqual(meta, {}) - - # Feeding partial data in won't get anything useful out. - action, meta = sendframe( - reactor, - ffs( - b'%d 2 encoded command-response continuation %s' - % (request1.requestid, outstream.encode(response1)) - ), - ) - self.assertEqual(action, b'responsedata') - self.assertEqual(meta[b'data'], b'') - - # But flushing data at both ends will get our original data. - action, meta = sendframe( - reactor, - ffs( - b'%d 2 encoded command-response eos %s' - % (request1.requestid, outstream.flush()) - ), - ) - self.assertEqual(action, b'responsedata') - self.assertEqual(meta[b'data'], response1) - - # We should be able to reuse the compressor/decompressor for the - # 2nd response. - action, meta = sendframe( - reactor, - ffs( - b'%d 2 encoded command-response continuation %s' - % (request2.requestid, outstream.encode(response2)) - ), - ) - self.assertEqual(action, b'responsedata') - self.assertEqual(meta[b'data'], b'') - - action, meta = sendframe( - reactor, - ffs( - b'%d 2 encoded command-response eos %s' - % (request2.requestid, outstream.flush()) - ), - ) - self.assertEqual(action, b'responsedata') - self.assertEqual(meta[b'data'], response2) - - -if __name__ == '__main__': - if (3, 6, 0) <= sys.version_info < (3, 6, 4): - # Python 3.6.0 through 3.6.3 inclusive shipped with - # https://bugs.python.org/issue31825 and we can't run these - # tests on those specific versions of Python. Sigh. - sys.exit(80) - import silenttestrunner - - silenttestrunner.main(__name__) diff --git a/tests/test-wireproto-framing.py b/tests/test-wireproto-framing.py deleted file mode 100644 --- a/tests/test-wireproto-framing.py +++ /dev/null @@ -1,305 +0,0 @@ -import unittest - -from mercurial import ( - util, - wireprotoframing as framing, -) - -ffs = framing.makeframefromhumanstring - - -class FrameHumanStringTests(unittest.TestCase): - def testbasic(self): - self.assertEqual( - ffs(b'1 1 0 1 0 '), b'\x00\x00\x00\x01\x00\x01\x00\x10' - ) - - self.assertEqual( - ffs(b'2 4 0 1 0 '), b'\x00\x00\x00\x02\x00\x04\x00\x10' - ) - - self.assertEqual( - ffs(b'2 4 0 1 0 foo'), b'\x03\x00\x00\x02\x00\x04\x00\x10foo' - ) - - def testcborint(self): - self.assertEqual( - ffs(b'1 1 0 1 0 cbor:15'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f' - ) - - self.assertEqual( - ffs(b'1 1 0 1 0 cbor:42'), b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*' - ) - - self.assertEqual( - ffs(b'1 1 0 1 0 cbor:1048576'), - b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a' b'\x00\x10\x00\x00', - ) - - self.assertEqual( - ffs(b'1 1 0 1 0 cbor:0'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x00' - ) - - self.assertEqual( - ffs(b'1 1 0 1 0 cbor:-1'), b'\x01\x00\x00\x01\x00\x01\x00\x10 ' - ) - - self.assertEqual( - ffs(b'1 1 0 1 0 cbor:-342542'), - b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r', - ) - - def testcborstrings(self): - self.assertEqual( - ffs(b"1 1 0 1 0 cbor:b'foo'"), - b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo', - ) - - def testcborlists(self): - self.assertEqual( - ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"), - b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4' b'\x18*Cfoo', - ) - - def testcbordicts(self): - self.assertEqual( - ffs(b"1 1 0 1 0 " b"cbor:{b'foo': b'val1', b'bar': b'val2'}"), - b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2' b'CbarDval2CfooDval1', - ) - - -class FrameTests(unittest.TestCase): - def testdataexactframesize(self): - data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE) - - stream = framing.stream(1) - frames = list( - framing.createcommandframes(stream, 1, b'command', {}, data) - ) - self.assertEqual( - frames, - [ - ffs( - b'1 1 stream-begin command-request new|have-data ' - b"cbor:{b'name': b'command'}" - ), - ffs(b'1 1 0 command-data continuation %s' % data.getvalue()), - ffs(b'1 1 0 command-data eos '), - ], - ) - - def testdatamultipleframes(self): - data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1)) - - stream = framing.stream(1) - frames = list( - framing.createcommandframes(stream, 1, b'command', {}, data) - ) - self.assertEqual( - frames, - [ - ffs( - b'1 1 stream-begin command-request new|have-data ' - b"cbor:{b'name': b'command'}" - ), - ffs( - b'1 1 0 command-data continuation %s' - % (b'x' * framing.DEFAULT_MAX_FRAME_SIZE) - ), - ffs(b'1 1 0 command-data eos x'), - ], - ) - - def testargsanddata(self): - data = util.bytesio(b'x' * 100) - - stream = framing.stream(1) - frames = list( - framing.createcommandframes( - stream, - 1, - b'command', - { - b'key1': b'key1value', - b'key2': b'key2value', - b'key3': b'key3value', - }, - data, - ) - ) - - self.assertEqual( - frames, - [ - ffs( - b'1 1 stream-begin command-request new|have-data ' - b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', " - b"b'key2': b'key2value', b'key3': b'key3value'}}" - ), - ffs(b'1 1 0 command-data eos %s' % data.getvalue()), - ], - ) - - if not getattr(unittest.TestCase, 'assertRaisesRegex', False): - # Python 3.7 deprecates the regex*p* version, but 2.7 lacks - # the regex version. - assertRaisesRegex = ( # camelcase-required - unittest.TestCase.assertRaisesRegexp - ) - - def testtextoutputformattingstringtype(self): - """Formatting string must be bytes.""" - with self.assertRaisesRegex(ValueError, 'must use bytes formatting '): - list( - framing.createtextoutputframe( - None, 1, [(b'foo'.decode('ascii'), [], [])] - ) - ) - - def testtextoutputargumentbytes(self): - with self.assertRaisesRegex(ValueError, 'must use bytes for argument'): - list( - framing.createtextoutputframe( - None, 1, [(b'foo', [b'foo'.decode('ascii')], [])] - ) - ) - - def testtextoutputlabelbytes(self): - with self.assertRaisesRegex(ValueError, 'must use bytes for labels'): - list( - framing.createtextoutputframe( - None, 1, [(b'foo', [], [b'foo'.decode('ascii')])] - ) - ) - - def testtextoutput1simpleatom(self): - stream = framing.stream(1) - val = list(framing.createtextoutputframe(stream, 1, [(b'foo', [], [])])) - - self.assertEqual( - val, - [ - ffs( - b'1 1 stream-begin text-output 0 ' - b"cbor:[{b'msg': b'foo'}]" - ), - ], - ) - - def testtextoutput2simpleatoms(self): - stream = framing.stream(1) - val = list( - framing.createtextoutputframe( - stream, - 1, - [ - (b'foo', [], []), - (b'bar', [], []), - ], - ) - ) - - self.assertEqual( - val, - [ - ffs( - b'1 1 stream-begin text-output 0 ' - b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]" - ) - ], - ) - - def testtextoutput1arg(self): - stream = framing.stream(1) - val = list( - framing.createtextoutputframe( - stream, - 1, - [ - (b'foo %s', [b'val1'], []), - ], - ) - ) - - self.assertEqual( - val, - [ - ffs( - b'1 1 stream-begin text-output 0 ' - b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]" - ) - ], - ) - - def testtextoutput2arg(self): - stream = framing.stream(1) - val = list( - framing.createtextoutputframe( - stream, - 1, - [ - (b'foo %s %s', [b'val', b'value'], []), - ], - ) - ) - - self.assertEqual( - val, - [ - ffs( - b'1 1 stream-begin text-output 0 ' - b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]" - ) - ], - ) - - def testtextoutput1label(self): - stream = framing.stream(1) - val = list( - framing.createtextoutputframe( - stream, - 1, - [ - (b'foo', [], [b'label']), - ], - ) - ) - - self.assertEqual( - val, - [ - ffs( - b'1 1 stream-begin text-output 0 ' - b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]" - ) - ], - ) - - def testargandlabel(self): - stream = framing.stream(1) - val = list( - framing.createtextoutputframe( - stream, - 1, - [ - (b'foo %s', [b'arg'], [b'label']), - ], - ) - ) - - self.assertEqual( - val, - [ - ffs( - b'1 1 stream-begin text-output 0 ' - b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], " - b"b'labels': [b'label']}]" - ) - ], - ) - - -if __name__ == '__main__': - import silenttestrunner - - silenttestrunner.main(__name__) diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py deleted file mode 100644 --- a/tests/test-wireproto-serverreactor.py +++ /dev/null @@ -1,845 +0,0 @@ -import unittest - -from mercurial import ( - ui as uimod, - util, - wireprotoframing as framing, -) -from mercurial.utils import cborutil - -ffs = framing.makeframefromhumanstring - -OK = b''.join(cborutil.streamencode({b'status': b'ok'})) - - -def makereactor(deferoutput=False): - ui = uimod.ui() - return framing.serverreactor(ui, deferoutput=deferoutput) - - -def sendframes(reactor, gen): - """Send a generator of frame bytearray to a reactor. - - Emits a generator of results from ``onframerecv()`` calls. - """ - for frame in gen: - header = framing.parseheader(frame) - payload = frame[framing.FRAME_HEADER_SIZE :] - assert len(payload) == header.length - - yield reactor.onframerecv( - framing.frame( - header.requestid, - header.streamid, - header.streamflags, - header.typeid, - header.flags, - payload, - ) - ) - - -def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None): - """Generate frames to run a command and send them to a reactor.""" - return sendframes( - reactor, framing.createcommandframes(stream, rid, cmd, args, datafh) - ) - - -class ServerReactorTests(unittest.TestCase): - def _sendsingleframe(self, reactor, f): - results = list(sendframes(reactor, [f])) - self.assertEqual(len(results), 1) - - return results[0] - - def assertaction(self, res, expected): - self.assertIsInstance(res, tuple) - self.assertEqual(len(res), 2) - self.assertIsInstance(res[1], dict) - self.assertEqual(res[0], expected) - - def assertframesequal(self, frames, framestrings): - expected = [ffs(s) for s in framestrings] - self.assertEqual(list(frames), expected) - - def test1framecommand(self): - """Receiving a command in a single frame yields request to run it.""" - reactor = makereactor() - stream = framing.stream(1) - results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {})) - self.assertEqual(len(results), 1) - self.assertaction(results[0], b'runcommand') - self.assertEqual( - results[0][1], - { - b'requestid': 1, - b'command': b'mycommand', - b'args': {}, - b'redirect': None, - b'data': None, - }, - ) - - result = reactor.oninputeof() - self.assertaction(result, b'noop') - - def test1argument(self): - reactor = makereactor() - stream = framing.stream(1) - results = list( - sendcommandframes( - reactor, stream, 41, b'mycommand', {b'foo': b'bar'} - ) - ) - self.assertEqual(len(results), 1) - self.assertaction(results[0], b'runcommand') - self.assertEqual( - results[0][1], - { - b'requestid': 41, - b'command': b'mycommand', - b'args': {b'foo': b'bar'}, - b'redirect': None, - b'data': None, - }, - ) - - def testmultiarguments(self): - reactor = makereactor() - stream = framing.stream(1) - results = list( - sendcommandframes( - reactor, - stream, - 1, - b'mycommand', - {b'foo': b'bar', b'biz': b'baz'}, - ) - ) - self.assertEqual(len(results), 1) - self.assertaction(results[0], b'runcommand') - self.assertEqual( - results[0][1], - { - b'requestid': 1, - b'command': b'mycommand', - b'args': {b'foo': b'bar', b'biz': b'baz'}, - b'redirect': None, - b'data': None, - }, - ) - - def testsimplecommanddata(self): - reactor = makereactor() - stream = framing.stream(1) - results = list( - sendcommandframes( - reactor, stream, 1, b'mycommand', {}, util.bytesio(b'data!') - ) - ) - self.assertEqual(len(results), 2) - self.assertaction(results[0], b'wantframe') - self.assertaction(results[1], b'runcommand') - self.assertEqual( - results[1][1], - { - b'requestid': 1, - b'command': b'mycommand', - b'args': {}, - b'redirect': None, - b'data': b'data!', - }, - ) - - def testmultipledataframes(self): - frames = [ - ffs( - b'1 1 stream-begin command-request new|have-data ' - b"cbor:{b'name': b'mycommand'}" - ), - ffs(b'1 1 0 command-data continuation data1'), - ffs(b'1 1 0 command-data continuation data2'), - ffs(b'1 1 0 command-data eos data3'), - ] - - reactor = makereactor() - results = list(sendframes(reactor, frames)) - self.assertEqual(len(results), 4) - for i in range(3): - self.assertaction(results[i], b'wantframe') - self.assertaction(results[3], b'runcommand') - self.assertEqual( - results[3][1], - { - b'requestid': 1, - b'command': b'mycommand', - b'args': {}, - b'redirect': None, - b'data': b'data1data2data3', - }, - ) - - def testargumentanddata(self): - frames = [ - ffs( - b'1 1 stream-begin command-request new|have-data ' - b"cbor:{b'name': b'command', b'args': {b'key': b'val'," - b"b'foo': b'bar'}}" - ), - ffs(b'1 1 0 command-data continuation value1'), - ffs(b'1 1 0 command-data eos value2'), - ] - - reactor = makereactor() - results = list(sendframes(reactor, frames)) - - self.assertaction(results[-1], b'runcommand') - self.assertEqual( - results[-1][1], - { - b'requestid': 1, - b'command': b'command', - b'args': { - b'key': b'val', - b'foo': b'bar', - }, - b'redirect': None, - b'data': b'value1value2', - }, - ) - - def testnewandcontinuation(self): - result = self._sendsingleframe( - makereactor(), - ffs(b'1 1 stream-begin command-request new|continuation '), - ) - self.assertaction(result, b'error') - self.assertEqual( - result[1], - { - b'message': b'received command request frame with both new and ' - b'continuation flags set', - }, - ) - - def testneithernewnorcontinuation(self): - result = self._sendsingleframe( - makereactor(), ffs(b'1 1 stream-begin command-request 0 ') - ) - self.assertaction(result, b'error') - self.assertEqual( - result[1], - { - b'message': b'received command request frame with neither new nor ' - b'continuation flags set', - }, - ) - - def testunexpectedcommanddata(self): - """Command data frame when not running a command is an error.""" - result = self._sendsingleframe( - makereactor(), ffs(b'1 1 stream-begin command-data 0 ignored') - ) - self.assertaction(result, b'error') - self.assertEqual( - result[1], - { - b'message': b'expected sender protocol settings or command request ' - b'frame; got 2', - }, - ) - - def testunexpectedcommanddatareceiving(self): - """Same as above except the command is receiving.""" - results = list( - sendframes( - makereactor(), - [ - ffs( - b'1 1 stream-begin command-request new|more ' - b"cbor:{b'name': b'ignored'}" - ), - ffs(b'1 1 0 command-data eos ignored'), - ], - ) - ) - - self.assertaction(results[0], b'wantframe') - self.assertaction(results[1], b'error') - self.assertEqual( - results[1][1], - { - b'message': b'received command data frame for request that is not ' - b'expecting data: 1', - }, - ) - - def testconflictingrequestidallowed(self): - """Multiple fully serviced commands with same request ID is allowed.""" - reactor = makereactor() - results = [] - outstream = reactor.makeoutputstream() - results.append( - self._sendsingleframe( - reactor, - ffs( - b'1 1 stream-begin command-request new ' - b"cbor:{b'name': b'command'}" - ), - ) - ) - result = reactor.oncommandresponsereadyobjects( - outstream, 1, [b'response1'] - ) - self.assertaction(result, b'sendframes') - list(result[1][b'framegen']) - results.append( - self._sendsingleframe( - reactor, - ffs( - b'1 1 stream-begin command-request new ' - b"cbor:{b'name': b'command'}" - ), - ) - ) - result = reactor.oncommandresponsereadyobjects( - outstream, 1, [b'response2'] - ) - self.assertaction(result, b'sendframes') - list(result[1][b'framegen']) - results.append( - self._sendsingleframe( - reactor, - ffs( - b'1 1 stream-begin command-request new ' - b"cbor:{b'name': b'command'}" - ), - ) - ) - result = reactor.oncommandresponsereadyobjects( - outstream, 1, [b'response3'] - ) - self.assertaction(result, b'sendframes') - list(result[1][b'framegen']) - - for i in range(3): - self.assertaction(results[i], b'runcommand') - self.assertEqual( - results[i][1], - { - b'requestid': 1, - b'command': b'command', - b'args': {}, - b'redirect': None, - b'data': None, - }, - ) - - def testconflictingrequestid(self): - """Request ID for new command matching in-flight command is illegal.""" - results = list( - sendframes( - makereactor(), - [ - ffs( - b'1 1 stream-begin command-request new|more ' - b"cbor:{b'name': b'command'}" - ), - ffs( - b'1 1 0 command-request new ' - b"cbor:{b'name': b'command1'}" - ), - ], - ) - ) - - self.assertaction(results[0], b'wantframe') - self.assertaction(results[1], b'error') - self.assertEqual( - results[1][1], - { - b'message': b'request with ID 1 already received', - }, - ) - - def testinterleavedcommands(self): - cbor1 = b''.join( - cborutil.streamencode( - { - b'name': b'command1', - b'args': { - b'foo': b'bar', - b'key1': b'val', - }, - } - ) - ) - cbor3 = b''.join( - cborutil.streamencode( - { - b'name': b'command3', - b'args': { - b'biz': b'baz', - b'key': b'val', - }, - } - ) - ) - - results = list( - sendframes( - makereactor(), - [ - ffs( - b'1 1 stream-begin command-request new|more %s' - % cbor1[0:6] - ), - ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]), - ffs( - b'1 1 0 command-request continuation|more %s' - % cbor1[6:9] - ), - ffs( - b'3 1 0 command-request continuation|more %s' - % cbor3[10:13] - ), - ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]), - ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]), - ], - ) - ) - - self.assertEqual( - [t[0] for t in results], - [ - b'wantframe', - b'wantframe', - b'wantframe', - b'wantframe', - b'runcommand', - b'runcommand', - ], - ) - - self.assertEqual( - results[4][1], - { - b'requestid': 3, - b'command': b'command3', - b'args': {b'biz': b'baz', b'key': b'val'}, - b'redirect': None, - b'data': None, - }, - ) - self.assertEqual( - results[5][1], - { - b'requestid': 1, - b'command': b'command1', - b'args': {b'foo': b'bar', b'key1': b'val'}, - b'redirect': None, - b'data': None, - }, - ) - - def testmissingcommanddataframe(self): - # The reactor doesn't currently handle partially received commands. - # So this test is failing to do anything with request 1. - frames = [ - ffs( - b'1 1 stream-begin command-request new|have-data ' - b"cbor:{b'name': b'command1'}" - ), - ffs(b'3 1 0 command-request new ' b"cbor:{b'name': b'command2'}"), - ] - results = list(sendframes(makereactor(), frames)) - self.assertEqual(len(results), 2) - self.assertaction(results[0], b'wantframe') - self.assertaction(results[1], b'runcommand') - - def testmissingcommanddataframeflags(self): - frames = [ - ffs( - b'1 1 stream-begin command-request new|have-data ' - b"cbor:{b'name': b'command1'}" - ), - ffs(b'1 1 0 command-data 0 data'), - ] - results = list(sendframes(makereactor(), frames)) - self.assertEqual(len(results), 2) - self.assertaction(results[0], b'wantframe') - self.assertaction(results[1], b'error') - self.assertEqual( - results[1][1], - { - b'message': b'command data frame without flags', - }, - ) - - def testframefornonreceivingrequest(self): - """Receiving a frame for a command that is not receiving is illegal.""" - results = list( - sendframes( - makereactor(), - [ - ffs( - b'1 1 stream-begin command-request new ' - b"cbor:{b'name': b'command1'}" - ), - ffs( - b'3 1 0 command-request new|have-data ' - b"cbor:{b'name': b'command3'}" - ), - ffs(b'5 1 0 command-data eos ignored'), - ], - ) - ) - self.assertaction(results[2], b'error') - self.assertEqual( - results[2][1], - { - b'message': b'received frame for request that is not receiving: 5', - }, - ) - - def testsimpleresponse(self): - """Bytes response to command sends result frames.""" - reactor = makereactor() - instream = framing.stream(1) - list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) - - outstream = reactor.makeoutputstream() - result = reactor.oncommandresponsereadyobjects( - outstream, 1, [b'response'] - ) - self.assertaction(result, b'sendframes') - self.assertframesequal( - result[1][b'framegen'], - [ - b'1 2 stream-begin stream-settings eos cbor:b"identity"', - b'1 2 encoded command-response continuation %s' % OK, - b'1 2 encoded command-response continuation cbor:b"response"', - b'1 2 0 command-response eos ', - ], - ) - - def testmultiframeresponse(self): - """Bytes response spanning multiple frames is handled.""" - first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE - second = b'y' * 100 - - reactor = makereactor() - instream = framing.stream(1) - list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) - - outstream = reactor.makeoutputstream() - result = reactor.oncommandresponsereadyobjects( - outstream, 1, [first + second] - ) - self.assertaction(result, b'sendframes') - self.assertframesequal( - result[1][b'framegen'], - [ - b'1 2 stream-begin stream-settings eos cbor:b"identity"', - b'1 2 encoded command-response continuation %s' % OK, - b'1 2 encoded command-response continuation Y\x80d', - b'1 2 encoded command-response continuation %s' % first, - b'1 2 encoded command-response continuation %s' % second, - b'1 2 0 command-response eos ', - ], - ) - - def testservererror(self): - reactor = makereactor() - instream = framing.stream(1) - list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) - - outstream = reactor.makeoutputstream() - result = reactor.onservererror(outstream, 1, b'some message') - self.assertaction(result, b'sendframes') - self.assertframesequal( - result[1][b'framegen'], - [ - b"1 2 stream-begin error-response 0 " - b"cbor:{b'type': b'server', " - b"b'message': [{b'msg': b'some message'}]}", - ], - ) - - def test1commanddeferresponse(self): - """Responses when in deferred output mode are delayed until EOF.""" - reactor = makereactor(deferoutput=True) - instream = framing.stream(1) - results = list( - sendcommandframes(reactor, instream, 1, b'mycommand', {}) - ) - self.assertEqual(len(results), 1) - self.assertaction(results[0], b'runcommand') - - outstream = reactor.makeoutputstream() - result = reactor.oncommandresponsereadyobjects( - outstream, 1, [b'response'] - ) - self.assertaction(result, b'noop') - result = reactor.oninputeof() - self.assertaction(result, b'sendframes') - self.assertframesequal( - result[1][b'framegen'], - [ - b'1 2 stream-begin stream-settings eos cbor:b"identity"', - b'1 2 encoded command-response continuation %s' % OK, - b'1 2 encoded command-response continuation cbor:b"response"', - b'1 2 0 command-response eos ', - ], - ) - - def testmultiplecommanddeferresponse(self): - reactor = makereactor(deferoutput=True) - instream = framing.stream(1) - list(sendcommandframes(reactor, instream, 1, b'command1', {})) - list(sendcommandframes(reactor, instream, 3, b'command2', {})) - - outstream = reactor.makeoutputstream() - result = reactor.oncommandresponsereadyobjects( - outstream, 1, [b'response1'] - ) - self.assertaction(result, b'noop') - result = reactor.oncommandresponsereadyobjects( - outstream, 3, [b'response2'] - ) - self.assertaction(result, b'noop') - result = reactor.oninputeof() - self.assertaction(result, b'sendframes') - self.assertframesequal( - result[1][b'framegen'], - [ - b'1 2 stream-begin stream-settings eos cbor:b"identity"', - b'1 2 encoded command-response continuation %s' % OK, - b'1 2 encoded command-response continuation cbor:b"response1"', - b'1 2 0 command-response eos ', - b'3 2 encoded command-response continuation %s' % OK, - b'3 2 encoded command-response continuation cbor:b"response2"', - b'3 2 0 command-response eos ', - ], - ) - - def testrequestidtracking(self): - reactor = makereactor(deferoutput=True) - instream = framing.stream(1) - list(sendcommandframes(reactor, instream, 1, b'command1', {})) - list(sendcommandframes(reactor, instream, 3, b'command2', {})) - list(sendcommandframes(reactor, instream, 5, b'command3', {})) - - # Register results for commands out of order. - outstream = reactor.makeoutputstream() - reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3']) - reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1']) - reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5']) - - result = reactor.oninputeof() - self.assertaction(result, b'sendframes') - self.assertframesequal( - result[1][b'framegen'], - [ - b'3 2 stream-begin stream-settings eos cbor:b"identity"', - b'3 2 encoded command-response continuation %s' % OK, - b'3 2 encoded command-response continuation cbor:b"response3"', - b'3 2 0 command-response eos ', - b'1 2 encoded command-response continuation %s' % OK, - b'1 2 encoded command-response continuation cbor:b"response1"', - b'1 2 0 command-response eos ', - b'5 2 encoded command-response continuation %s' % OK, - b'5 2 encoded command-response continuation cbor:b"response5"', - b'5 2 0 command-response eos ', - ], - ) - - def testduplicaterequestonactivecommand(self): - """Receiving a request ID that matches a request that isn't finished.""" - reactor = makereactor() - stream = framing.stream(1) - list(sendcommandframes(reactor, stream, 1, b'command1', {})) - results = list(sendcommandframes(reactor, stream, 1, b'command1', {})) - - self.assertaction(results[0], b'error') - self.assertEqual( - results[0][1], - { - b'message': b'request with ID 1 is already active', - }, - ) - - def testduplicaterequestonactivecommandnosend(self): - """Same as above but we've registered a response but haven't sent it.""" - reactor = makereactor() - instream = framing.stream(1) - list(sendcommandframes(reactor, instream, 1, b'command1', {})) - outstream = reactor.makeoutputstream() - reactor.oncommandresponsereadyobjects(outstream, 1, [b'response']) - - # We've registered the response but haven't sent it. From the - # perspective of the reactor, the command is still active. - - results = list(sendcommandframes(reactor, instream, 1, b'command1', {})) - self.assertaction(results[0], b'error') - self.assertEqual( - results[0][1], - { - b'message': b'request with ID 1 is already active', - }, - ) - - def testduplicaterequestaftersend(self): - """We can use a duplicate request ID after we've sent the response.""" - reactor = makereactor() - instream = framing.stream(1) - list(sendcommandframes(reactor, instream, 1, b'command1', {})) - outstream = reactor.makeoutputstream() - res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response']) - list(res[1][b'framegen']) - - results = list(sendcommandframes(reactor, instream, 1, b'command1', {})) - self.assertaction(results[0], b'runcommand') - - def testprotocolsettingsnoflags(self): - result = self._sendsingleframe( - makereactor(), ffs(b'0 1 stream-begin sender-protocol-settings 0 ') - ) - self.assertaction(result, b'error') - self.assertEqual( - result[1], - { - b'message': b'sender protocol settings frame must have ' - b'continuation or end of stream flag set', - }, - ) - - def testprotocolsettingsconflictflags(self): - result = self._sendsingleframe( - makereactor(), - ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '), - ) - self.assertaction(result, b'error') - self.assertEqual( - result[1], - { - b'message': b'sender protocol settings frame cannot have both ' - b'continuation and end of stream flags set', - }, - ) - - def testprotocolsettingsemptypayload(self): - result = self._sendsingleframe( - makereactor(), - ffs(b'0 1 stream-begin sender-protocol-settings eos '), - ) - self.assertaction(result, b'error') - self.assertEqual( - result[1], - { - b'message': b'sender protocol settings frame did not contain CBOR ' - b'data', - }, - ) - - def testprotocolsettingsmultipleobjects(self): - result = self._sendsingleframe( - makereactor(), - ffs( - b'0 1 stream-begin sender-protocol-settings eos ' - b'\x46foobar\x43foo' - ), - ) - self.assertaction(result, b'error') - self.assertEqual( - result[1], - { - b'message': b'sender protocol settings frame contained multiple ' - b'CBOR values', - }, - ) - - def testprotocolsettingscontentencodings(self): - reactor = makereactor() - - result = self._sendsingleframe( - reactor, - ffs( - b'0 1 stream-begin sender-protocol-settings eos ' - b'cbor:{b"contentencodings": [b"a", b"b"]}' - ), - ) - self.assertaction(result, b'wantframe') - - self.assertEqual(reactor._state, b'idle') - self.assertEqual( - reactor._sendersettings[b'contentencodings'], [b'a', b'b'] - ) - - def testprotocolsettingsmultipleframes(self): - reactor = makereactor() - - data = b''.join( - cborutil.streamencode( - { - b'contentencodings': [b'value1', b'value2'], - } - ) - ) - - results = list( - sendframes( - reactor, - [ - ffs( - b'0 1 stream-begin sender-protocol-settings continuation %s' - % data[0:5] - ), - ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]), - ], - ) - ) - - self.assertEqual(len(results), 2) - - self.assertaction(results[0], b'wantframe') - self.assertaction(results[1], b'wantframe') - - self.assertEqual(reactor._state, b'idle') - self.assertEqual( - reactor._sendersettings[b'contentencodings'], [b'value1', b'value2'] - ) - - def testprotocolsettingsbadcbor(self): - result = self._sendsingleframe( - makereactor(), - ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'), - ) - self.assertaction(result, b'error') - - def testprotocolsettingsnoninitial(self): - # Cannot have protocol settings frames as non-initial frames. - reactor = makereactor() - - stream = framing.stream(1) - results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {})) - self.assertEqual(len(results), 1) - self.assertaction(results[0], b'runcommand') - - result = self._sendsingleframe( - reactor, ffs(b'0 1 0 sender-protocol-settings eos ') - ) - self.assertaction(result, b'error') - self.assertEqual( - result[1], - { - b'message': b'expected command request frame; got 8', - }, - ) - - -if __name__ == '__main__': - import silenttestrunner - - silenttestrunner.main(__name__)