diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py --- a/mercurial/httppeer.py +++ b/mercurial/httppeer.py @@ -551,18 +551,19 @@ self.ui.note(_('received %r\n') % frame) - if frame.typeid == wireprotoframing.FRAME_TYPE_BYTES_RESPONSE: - if frame.flags & wireprotoframing.FLAG_BYTES_RESPONSE_CBOR: - payload = util.bytesio(frame.payload) + action, meta = reactor.onframerecv(frame) + + if action == 'responsedata': + if meta['cbor']: + payload = util.bytesio(meta['data']) decoder = cbor.CBORDecoder(payload) - while payload.tell() + 1 < len(frame.payload): + while payload.tell() + 1 < len(meta['data']): results.append(decoder.decode()) else: - results.append(frame.payload) + results.append(meta['data']) else: - error.ProgrammingError('unhandled frame type: %d' % - frame.typeid) + error.ProgrammingError('unhandled action: %s' % action) return results diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -922,6 +922,7 @@ self._outgoingstream = stream(1) self._pendingrequests = collections.deque() self._activerequests = {} + self._incomingstreams = {} def callcommand(self, name, args, datafh=None): """Request that a command be executed. @@ -1007,3 +1008,63 @@ yield frame request.state = 'sent' + + def onframerecv(self, frame): + """Process a frame that has been received off the wire. + + Returns a 2-tuple of (action, meta) describing further action the + caller needs to take as a result of receiving this frame. + """ + if frame.streamid % 2: + return 'error', { + 'message': ( + _('received frame with odd numbered stream ID: %d') % + frame.streamid), + } + + if frame.streamid not in self._incomingstreams: + if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: + return 'error', { + 'message': _('received frame on unknown stream ' + 'without beginning of stream flag set'), + } + + if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: + raise error.ProgrammingError('support for decoding stream ' + 'payloads not yet implemneted') + + if frame.streamflags & STREAM_FLAG_END_STREAM: + del self._incomingstreams[frame.streamid] + + if frame.requestid not in self._activerequests: + return 'error', { + 'message': (_('received frame for inactive request ID: %d') % + frame.requestid), + } + + request = self._activerequests[frame.requestid] + request.state = 'receiving' + + handlers = { + FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe, + } + + meth = handlers.get(frame.typeid) + if not meth: + raise error.ProgrammingError('unhandled frame type: %d' % + frame.typeid) + + return meth(request, frame) + + def _onbytesresponseframe(self, request, frame): + if frame.flags & FLAG_BYTES_RESPONSE_EOS: + request.state = 'received' + del self._activerequests[request.requestid] + + return 'responsedata', { + 'request': request, + 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION, + 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS, + 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR, + 'data': frame.payload, + } diff --git a/tests/test-wireproto-clientreactor.py b/tests/test-wireproto-clientreactor.py --- a/tests/test-wireproto-clientreactor.py +++ b/tests/test-wireproto-clientreactor.py @@ -7,6 +7,21 @@ wireprotoframing as framing, ) +ffs = framing.makeframefromhumanstring + +def sendframe(reactor, frame): + """Send a frame bytearray to a reactor.""" + header = framing.parseheader(frame) + payload = frame[framing.FRAME_HEADER_SIZE:] + assert len(payload) == header.length + + return reactor.onframerecv(framing.frame(header.requestid, + header.streamid, + header.streamflags, + header.typeid, + header.flags, + payload)) + class SingleSendTests(unittest.TestCase): """A reactor that can only send once rejects subsequent sends.""" def testbasic(self): @@ -61,6 +76,35 @@ self.assertEqual(request.state, 'sent') +class BadFrameRecvTests(unittest.TestCase): + def testoddstream(self): + reactor = framing.clientreactor() + + action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo')) + self.assertEqual(action, 'error') + self.assertEqual(meta['message'], + 'received frame with odd numbered stream ID: 1') + + def testunknownstream(self): + reactor = framing.clientreactor() + + action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo')) + self.assertEqual(action, 'error') + self.assertEqual(meta['message'], + 'received frame on unknown stream without beginning ' + 'of stream flag set') + + def testunhandledframetype(self): + reactor = framing.clientreactor(buffersends=False) + + request, action, meta = reactor.callcommand(b'foo', {}) + for frame in meta['framegen']: + pass + + with self.assertRaisesRegexp(error.ProgrammingError, + 'unhandled frame type'): + sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo')) + if __name__ == '__main__': import silenttestrunner silenttestrunner.main(__name__)