diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -472,6 +472,10 @@ self._bufferedframegens = [] # request id -> dict of commands that are actively being received. self._receivingcommands = {} + # Request IDs that have been received and are actively being processed. + # Once all output for a request has been sent, it is removed from this + # set. + self._activecommands = set() def onframerecv(self, frame): """Process a frame that has been received off the wire. @@ -496,14 +500,20 @@ The raw bytes response is passed as an argument. """ - framegen = createbytesresponseframesfrombytes(requestid, data) + def sendframes(): + for frame in createbytesresponseframesfrombytes(requestid, data): + yield frame + + self._activecommands.remove(requestid) + + result = sendframes() if self._deferoutput: - self._bufferedframegens.append(framegen) + self._bufferedframegens.append(result) return 'noop', {} else: return 'sendframes', { - 'framegen': framegen, + 'framegen': result, } def oninputeof(self): @@ -546,6 +556,9 @@ else: self._state = 'idle' + assert requestid not in self._activecommands + self._activecommands.add(requestid) + return 'runcommand', { 'requestid': requestid, 'command': entry['command'], @@ -571,6 +584,11 @@ return self._makeerrorresult( _('request with ID %d already received') % frame.requestid) + if frame.requestid in self._activecommands: + self._state = 'errored' + return self._makeerrorresult(( + _('request with ID %d is already active') % frame.requestid)) + expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS) expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA) @@ -599,7 +617,13 @@ return self._onframeidle(frame) # All other frames should be related to a command that is currently - # receiving. + # receiving but is not active. + if frame.requestid in self._activecommands: + self._state = 'errored' + return self._makeerrorresult( + _('received frame for request that is still active: %d') % + frame.requestid) + if frame.requestid not in self._receivingcommands: self._state = 'errored' return self._makeerrorresult( diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py --- a/tests/test-wireproto-serverreactor.py +++ b/tests/test-wireproto-serverreactor.py @@ -478,11 +478,11 @@ results = list(sendframes(makereactor(), [ ffs(b'1 command-name eos command1'), ffs(b'3 command-name have-data command3'), - ffs(b'1 command-argument eoa ignored'), + ffs(b'5 command-argument eoa ignored'), ])) self.assertaction(results[2], 'error') self.assertEqual(results[2][1], { - 'message': b'received frame for request that is not receiving: 1', + 'message': b'received frame for request that is not receiving: 5', }) def testsimpleresponse(self): @@ -571,6 +571,56 @@ b'5 bytes-response eos response5', ]) + def testduplicaterequestonactivecommand(self): + """Receiving a request ID that matches a request that isn't finished.""" + reactor = makereactor() + list(sendcommandframes(reactor, 1, b'command1', {})) + results = list(sendcommandframes(reactor, 1, b'command1', {})) + + self.assertaction(results[0], 'error') + self.assertEqual(results[0][1], { + 'message': b'request with ID 1 is already active', + }) + + def testduplicaterequestonactivecommandnosend(self): + """Same as above but we've registered a response but haven't sent it.""" + reactor = makereactor() + list(sendcommandframes(reactor, 1, b'command1', {})) + reactor.onbytesresponseready(1, b'response') + + # We've registered the response but haven't sent it. From the + # perspective of the reactor, the command is still active. + + results = list(sendcommandframes(reactor, 1, b'command1', {})) + self.assertaction(results[0], 'error') + self.assertEqual(results[0][1], { + 'message': b'request with ID 1 is already active', + }) + + def testduplicaterequestargumentframe(self): + """Variant on above except we sent an argument frame instead of name.""" + reactor = makereactor() + list(sendcommandframes(reactor, 1, b'command', {})) + results = list(sendframes(reactor, [ + ffs(b'3 command-name have-args command'), + ffs(b'1 command-argument 0 ignored'), + ])) + self.assertaction(results[0], 'wantframe') + self.assertaction(results[1], 'error') + self.assertEqual(results[1][1], { + 'message': 'received frame for request that is still active: 1', + }) + + def testduplicaterequestaftersend(self): + """We can use a duplicate request ID after we've sent the response.""" + reactor = makereactor() + list(sendcommandframes(reactor, 1, b'command1', {})) + res = reactor.onbytesresponseready(1, b'response') + list(res[1]['framegen']) + + results = list(sendcommandframes(reactor, 1, b'command1', {})) + self.assertaction(results[0], 'runcommand') + if __name__ == '__main__': import silenttestrunner silenttestrunner.main(__name__)