All code in the actual server uses oncommandresponsereadyobjects().
Test code was ported to that method. This resulted in a handful of
subtle test changes.
hg-reviewers |
All code in the actual server uses oncommandresponsereadyobjects().
Test code was ported to that method. This resulted in a handful of
subtle test changes.
Lint Skipped |
Unit Tests Skipped |
Path | Packages | |||
---|---|---|---|---|
M | mercurial/wireprotoframing.py (97 lines) | |||
M | tests/test-wireproto-serverreactor.py (67 lines) |
Commit | Parents | Author | Summary | Date |
---|---|---|---|---|
Gregory Szorc | Oct 5 2018, 1:29 PM |
yield stream.makeframe(requestid=requestid, | yield stream.makeframe(requestid=requestid, | ||||
typeid=FRAME_TYPE_COMMAND_DATA, | typeid=FRAME_TYPE_COMMAND_DATA, | ||||
flags=flags, | flags=flags, | ||||
payload=data) | payload=data) | ||||
if done: | if done: | ||||
break | break | ||||
def createcommandresponseframesfrombytes(stream, requestid, data, | |||||
maxframesize=DEFAULT_MAX_FRAME_SIZE): | |||||
"""Create a raw frame to send a bytes response from static bytes input. | |||||
Returns a generator of bytearrays. | |||||
""" | |||||
# Automatically send the overall CBOR response map. | |||||
overall = b''.join(cborutil.streamencode({b'status': b'ok'})) | |||||
if len(overall) > maxframesize: | |||||
raise error.ProgrammingError('not yet implemented') | |||||
# Simple case where we can fit the full response in a single frame. | |||||
if len(overall) + len(data) <= maxframesize: | |||||
flags = FLAG_COMMAND_RESPONSE_EOS | |||||
yield stream.makeframe(requestid=requestid, | |||||
typeid=FRAME_TYPE_COMMAND_RESPONSE, | |||||
flags=flags, | |||||
payload=overall + data) | |||||
return | |||||
# It's easier to send the overall CBOR map in its own frame than to track | |||||
# offsets. | |||||
yield stream.makeframe(requestid=requestid, | |||||
typeid=FRAME_TYPE_COMMAND_RESPONSE, | |||||
flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | |||||
payload=overall) | |||||
offset = 0 | |||||
while True: | |||||
chunk = data[offset:offset + maxframesize] | |||||
offset += len(chunk) | |||||
done = offset == len(data) | |||||
if done: | |||||
flags = FLAG_COMMAND_RESPONSE_EOS | |||||
else: | |||||
flags = FLAG_COMMAND_RESPONSE_CONTINUATION | |||||
yield stream.makeframe(requestid=requestid, | |||||
typeid=FRAME_TYPE_COMMAND_RESPONSE, | |||||
flags=flags, | |||||
payload=chunk) | |||||
if done: | |||||
break | |||||
def createbytesresponseframesfromgen(stream, requestid, gen, | |||||
maxframesize=DEFAULT_MAX_FRAME_SIZE): | |||||
"""Generator of frames from a generator of byte chunks. | |||||
This assumes that another frame will follow whatever this emits. i.e. | |||||
this always emits the continuation flag and never emits the end-of-stream | |||||
flag. | |||||
""" | |||||
cb = util.chunkbuffer(gen) | |||||
flags = FLAG_COMMAND_RESPONSE_CONTINUATION | |||||
while True: | |||||
chunk = cb.read(maxframesize) | |||||
if not chunk: | |||||
break | |||||
yield stream.makeframe(requestid=requestid, | |||||
typeid=FRAME_TYPE_COMMAND_RESPONSE, | |||||
flags=flags, | |||||
payload=chunk) | |||||
flags |= FLAG_COMMAND_RESPONSE_CONTINUATION | |||||
def createcommandresponseokframe(stream, requestid): | def createcommandresponseokframe(stream, requestid): | ||||
overall = b''.join(cborutil.streamencode({b'status': b'ok'})) | overall = b''.join(cborutil.streamencode({b'status': b'ok'})) | ||||
return stream.makeframe(requestid=requestid, | return stream.makeframe(requestid=requestid, | ||||
typeid=FRAME_TYPE_COMMAND_RESPONSE, | typeid=FRAME_TYPE_COMMAND_RESPONSE, | ||||
flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | ||||
payload=overall) | payload=overall) | ||||
} | } | ||||
meth = handlers.get(self._state) | meth = handlers.get(self._state) | ||||
if not meth: | if not meth: | ||||
raise error.ProgrammingError('unhandled state: %s' % self._state) | raise error.ProgrammingError('unhandled state: %s' % self._state) | ||||
return meth(frame) | return meth(frame) | ||||
def oncommandresponseready(self, stream, requestid, data): | |||||
"""Signal that a bytes response is ready to be sent to the client. | |||||
The raw bytes response is passed as an argument. | |||||
""" | |||||
ensureserverstream(stream) | |||||
def sendframes(): | |||||
for frame in createcommandresponseframesfrombytes(stream, requestid, | |||||
data): | |||||
yield frame | |||||
self._activecommands.remove(requestid) | |||||
result = sendframes() | |||||
if self._deferoutput: | |||||
self._bufferedframegens.append(result) | |||||
return 'noop', {} | |||||
else: | |||||
return 'sendframes', { | |||||
'framegen': result, | |||||
} | |||||
def oncommandresponsereadyobjects(self, stream, requestid, objs): | def oncommandresponsereadyobjects(self, stream, requestid, objs): | ||||
"""Signal that objects are ready to be sent to the client. | """Signal that objects are ready to be sent to the client. | ||||
``objs`` is an iterable of objects (typically a generator) that will | ``objs`` is an iterable of objects (typically a generator) that will | ||||
be encoded via CBOR and added to frames, which will be sent to the | be encoded via CBOR and added to frames, which will be sent to the | ||||
client. | client. | ||||
""" | """ | ||||
ensureserverstream(stream) | ensureserverstream(stream) | ||||
# A more robust solution would be to check for objs.{next,__next__}. | |||||
if isinstance(objs, list): | |||||
objs = iter(objs) | |||||
# We need to take care over exception handling. Uncaught exceptions | # We need to take care over exception handling. Uncaught exceptions | ||||
# when generating frames could lead to premature end of the frame | # when generating frames could lead to premature end of the frame | ||||
# stream and the possibility of the server or client process getting | # stream and the possibility of the server or client process getting | ||||
# in a bad state. | # in a bad state. | ||||
# | # | ||||
# Keep in mind that if ``objs`` is a generator, advancing it could | # Keep in mind that if ``objs`` is a generator, advancing it could | ||||
# raise exceptions that originated in e.g. wire protocol command | # raise exceptions that originated in e.g. wire protocol command | ||||
# functions. That is why we differentiate between exceptions raised | # functions. That is why we differentiate between exceptions raised |
def testconflictingrequestidallowed(self): | def testconflictingrequestidallowed(self): | ||||
"""Multiple fully serviced commands with same request ID is allowed.""" | """Multiple fully serviced commands with same request ID is allowed.""" | ||||
reactor = makereactor() | reactor = makereactor() | ||||
results = [] | results = [] | ||||
outstream = reactor.makeoutputstream() | outstream = reactor.makeoutputstream() | ||||
results.append(self._sendsingleframe( | results.append(self._sendsingleframe( | ||||
reactor, ffs(b'1 1 stream-begin command-request new ' | reactor, ffs(b'1 1 stream-begin command-request new ' | ||||
b"cbor:{b'name': b'command'}"))) | b"cbor:{b'name': b'command'}"))) | ||||
result = reactor.oncommandresponseready(outstream, 1, b'response1') | result = reactor.oncommandresponsereadyobjects( | ||||
outstream, 1, [b'response1']) | |||||
self.assertaction(result, b'sendframes') | self.assertaction(result, b'sendframes') | ||||
list(result[1][b'framegen']) | list(result[1][b'framegen']) | ||||
results.append(self._sendsingleframe( | results.append(self._sendsingleframe( | ||||
reactor, ffs(b'1 1 stream-begin command-request new ' | reactor, ffs(b'1 1 stream-begin command-request new ' | ||||
b"cbor:{b'name': b'command'}"))) | b"cbor:{b'name': b'command'}"))) | ||||
result = reactor.oncommandresponseready(outstream, 1, b'response2') | result = reactor.oncommandresponsereadyobjects( | ||||
outstream, 1, [b'response2']) | |||||
self.assertaction(result, b'sendframes') | self.assertaction(result, b'sendframes') | ||||
list(result[1][b'framegen']) | list(result[1][b'framegen']) | ||||
results.append(self._sendsingleframe( | results.append(self._sendsingleframe( | ||||
reactor, ffs(b'1 1 stream-begin command-request new ' | reactor, ffs(b'1 1 stream-begin command-request new ' | ||||
b"cbor:{b'name': b'command'}"))) | b"cbor:{b'name': b'command'}"))) | ||||
result = reactor.oncommandresponseready(outstream, 1, b'response3') | result = reactor.oncommandresponsereadyobjects( | ||||
outstream, 1, [b'response3']) | |||||
self.assertaction(result, b'sendframes') | self.assertaction(result, b'sendframes') | ||||
list(result[1][b'framegen']) | list(result[1][b'framegen']) | ||||
for i in range(3): | for i in range(3): | ||||
self.assertaction(results[i], b'runcommand') | self.assertaction(results[i], b'runcommand') | ||||
self.assertEqual(results[i][1], { | self.assertEqual(results[i][1], { | ||||
b'requestid': 1, | b'requestid': 1, | ||||
b'command': b'command', | b'command': b'command', | ||||
def testsimpleresponse(self): | def testsimpleresponse(self): | ||||
"""Bytes response to command sends result frames.""" | """Bytes response to command sends result frames.""" | ||||
reactor = makereactor() | reactor = makereactor() | ||||
instream = framing.stream(1) | instream = framing.stream(1) | ||||
list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) | ||||
outstream = reactor.makeoutputstream() | outstream = reactor.makeoutputstream() | ||||
result = reactor.oncommandresponseready(outstream, 1, b'response') | result = reactor.oncommandresponsereadyobjects( | ||||
outstream, 1, [b'response']) | |||||
self.assertaction(result, b'sendframes') | self.assertaction(result, b'sendframes') | ||||
self.assertframesequal(result[1][b'framegen'], [ | self.assertframesequal(result[1][b'framegen'], [ | ||||
b'1 2 stream-begin command-response eos %sresponse' % OK, | b'1 2 stream-begin command-response continuation %s' % OK, | ||||
b'1 2 0 command-response continuation cbor:b"response"', | |||||
b'1 2 0 command-response eos ', | |||||
]) | ]) | ||||
def testmultiframeresponse(self): | def testmultiframeresponse(self): | ||||
"""Bytes response spanning multiple frames is handled.""" | """Bytes response spanning multiple frames is handled.""" | ||||
first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE | first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE | ||||
second = b'y' * 100 | second = b'y' * 100 | ||||
reactor = makereactor() | reactor = makereactor() | ||||
instream = framing.stream(1) | instream = framing.stream(1) | ||||
list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) | ||||
outstream = reactor.makeoutputstream() | outstream = reactor.makeoutputstream() | ||||
result = reactor.oncommandresponseready(outstream, 1, first + second) | result = reactor.oncommandresponsereadyobjects( | ||||
outstream, 1, [first + second]) | |||||
self.assertaction(result, b'sendframes') | self.assertaction(result, b'sendframes') | ||||
self.assertframesequal(result[1][b'framegen'], [ | self.assertframesequal(result[1][b'framegen'], [ | ||||
b'1 2 stream-begin command-response continuation %s' % OK, | b'1 2 stream-begin command-response continuation %s' % OK, | ||||
b'1 2 0 command-response continuation Y\x80d', | |||||
b'1 2 0 command-response continuation %s' % first, | b'1 2 0 command-response continuation %s' % first, | ||||
b'1 2 0 command-response eos %s' % second, | b'1 2 0 command-response continuation %s' % second, | ||||
b'1 2 0 command-response continuation ', | |||||
b'1 2 0 command-response eos ' | |||||
]) | ]) | ||||
def testservererror(self): | def testservererror(self): | ||||
reactor = makereactor() | reactor = makereactor() | ||||
instream = framing.stream(1) | instream = framing.stream(1) | ||||
list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) | ||||
outstream = reactor.makeoutputstream() | outstream = reactor.makeoutputstream() | ||||
reactor = makereactor(deferoutput=True) | reactor = makereactor(deferoutput=True) | ||||
instream = framing.stream(1) | instream = framing.stream(1) | ||||
results = list(sendcommandframes(reactor, instream, 1, b'mycommand', | results = list(sendcommandframes(reactor, instream, 1, b'mycommand', | ||||
{})) | {})) | ||||
self.assertEqual(len(results), 1) | self.assertEqual(len(results), 1) | ||||
self.assertaction(results[0], b'runcommand') | self.assertaction(results[0], b'runcommand') | ||||
outstream = reactor.makeoutputstream() | outstream = reactor.makeoutputstream() | ||||
result = reactor.oncommandresponseready(outstream, 1, b'response') | result = reactor.oncommandresponsereadyobjects( | ||||
outstream, 1, [b'response']) | |||||
self.assertaction(result, b'noop') | self.assertaction(result, b'noop') | ||||
result = reactor.oninputeof() | result = reactor.oninputeof() | ||||
self.assertaction(result, b'sendframes') | self.assertaction(result, b'sendframes') | ||||
self.assertframesequal(result[1][b'framegen'], [ | self.assertframesequal(result[1][b'framegen'], [ | ||||
b'1 2 stream-begin command-response eos %sresponse' % OK, | b'1 2 stream-begin command-response continuation %s' % OK, | ||||
b'1 2 0 command-response continuation cbor:b"response"', | |||||
b'1 2 0 command-response eos ', | |||||
]) | ]) | ||||
def testmultiplecommanddeferresponse(self): | def testmultiplecommanddeferresponse(self): | ||||
reactor = makereactor(deferoutput=True) | reactor = makereactor(deferoutput=True) | ||||
instream = framing.stream(1) | instream = framing.stream(1) | ||||
list(sendcommandframes(reactor, instream, 1, b'command1', {})) | list(sendcommandframes(reactor, instream, 1, b'command1', {})) | ||||
list(sendcommandframes(reactor, instream, 3, b'command2', {})) | list(sendcommandframes(reactor, instream, 3, b'command2', {})) | ||||
outstream = reactor.makeoutputstream() | outstream = reactor.makeoutputstream() | ||||
result = reactor.oncommandresponseready(outstream, 1, b'response1') | result = reactor.oncommandresponsereadyobjects( | ||||
outstream, 1, [b'response1']) | |||||
self.assertaction(result, b'noop') | self.assertaction(result, b'noop') | ||||
result = reactor.oncommandresponseready(outstream, 3, b'response2') | result = reactor.oncommandresponsereadyobjects( | ||||
outstream, 3, [b'response2']) | |||||
self.assertaction(result, b'noop') | self.assertaction(result, b'noop') | ||||
result = reactor.oninputeof() | result = reactor.oninputeof() | ||||
self.assertaction(result, b'sendframes') | self.assertaction(result, b'sendframes') | ||||
self.assertframesequal(result[1][b'framegen'], [ | self.assertframesequal(result[1][b'framegen'], [ | ||||
b'1 2 stream-begin command-response eos %sresponse1' % OK, | b'1 2 stream-begin command-response continuation %s' % OK, | ||||
b'3 2 0 command-response eos %sresponse2' % OK, | b'1 2 0 command-response continuation cbor:b"response1"', | ||||
b'1 2 0 command-response eos ', | |||||
b'3 2 0 command-response continuation %s' % OK, | |||||
b'3 2 0 command-response continuation cbor:b"response2"', | |||||
b'3 2 0 command-response eos ', | |||||
]) | ]) | ||||
def testrequestidtracking(self): | def testrequestidtracking(self): | ||||
reactor = makereactor(deferoutput=True) | reactor = makereactor(deferoutput=True) | ||||
instream = framing.stream(1) | instream = framing.stream(1) | ||||
list(sendcommandframes(reactor, instream, 1, b'command1', {})) | list(sendcommandframes(reactor, instream, 1, b'command1', {})) | ||||
list(sendcommandframes(reactor, instream, 3, b'command2', {})) | list(sendcommandframes(reactor, instream, 3, b'command2', {})) | ||||
list(sendcommandframes(reactor, instream, 5, b'command3', {})) | list(sendcommandframes(reactor, instream, 5, b'command3', {})) | ||||
# Register results for commands out of order. | # Register results for commands out of order. | ||||
outstream = reactor.makeoutputstream() | outstream = reactor.makeoutputstream() | ||||
reactor.oncommandresponseready(outstream, 3, b'response3') | reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3']) | ||||
reactor.oncommandresponseready(outstream, 1, b'response1') | reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1']) | ||||
reactor.oncommandresponseready(outstream, 5, b'response5') | reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5']) | ||||
result = reactor.oninputeof() | result = reactor.oninputeof() | ||||
self.assertaction(result, b'sendframes') | self.assertaction(result, b'sendframes') | ||||
self.assertframesequal(result[1][b'framegen'], [ | self.assertframesequal(result[1][b'framegen'], [ | ||||
b'3 2 stream-begin command-response eos %sresponse3' % OK, | b'3 2 stream-begin command-response continuation %s' % OK, | ||||
b'1 2 0 command-response eos %sresponse1' % OK, | b'3 2 0 command-response continuation cbor:b"response3"', | ||||
b'5 2 0 command-response eos %sresponse5' % OK, | b'3 2 0 command-response eos ', | ||||
b'1 2 0 command-response continuation %s' % OK, | |||||
b'1 2 0 command-response continuation cbor:b"response1"', | |||||
b'1 2 0 command-response eos ', | |||||
b'5 2 0 command-response continuation %s' % OK, | |||||
b'5 2 0 command-response continuation cbor:b"response5"', | |||||
b'5 2 0 command-response eos ', | |||||
]) | ]) | ||||
def testduplicaterequestonactivecommand(self): | def testduplicaterequestonactivecommand(self): | ||||
"""Receiving a request ID that matches a request that isn't finished.""" | """Receiving a request ID that matches a request that isn't finished.""" | ||||
reactor = makereactor() | reactor = makereactor() | ||||
stream = framing.stream(1) | stream = framing.stream(1) | ||||
list(sendcommandframes(reactor, stream, 1, b'command1', {})) | list(sendcommandframes(reactor, stream, 1, b'command1', {})) | ||||
results = list(sendcommandframes(reactor, stream, 1, b'command1', {})) | results = list(sendcommandframes(reactor, stream, 1, b'command1', {})) | ||||
self.assertaction(results[0], b'error') | self.assertaction(results[0], b'error') | ||||
self.assertEqual(results[0][1], { | self.assertEqual(results[0][1], { | ||||
b'message': b'request with ID 1 is already active', | b'message': b'request with ID 1 is already active', | ||||
}) | }) | ||||
def testduplicaterequestonactivecommandnosend(self): | def testduplicaterequestonactivecommandnosend(self): | ||||
"""Same as above but we've registered a response but haven't sent it.""" | """Same as above but we've registered a response but haven't sent it.""" | ||||
reactor = makereactor() | reactor = makereactor() | ||||
instream = framing.stream(1) | instream = framing.stream(1) | ||||
list(sendcommandframes(reactor, instream, 1, b'command1', {})) | list(sendcommandframes(reactor, instream, 1, b'command1', {})) | ||||
outstream = reactor.makeoutputstream() | outstream = reactor.makeoutputstream() | ||||
reactor.oncommandresponseready(outstream, 1, b'response') | reactor.oncommandresponsereadyobjects(outstream, 1, [b'response']) | ||||
# We've registered the response but haven't sent it. From the | # We've registered the response but haven't sent it. From the | ||||
# perspective of the reactor, the command is still active. | # perspective of the reactor, the command is still active. | ||||
results = list(sendcommandframes(reactor, instream, 1, b'command1', {})) | results = list(sendcommandframes(reactor, instream, 1, b'command1', {})) | ||||
self.assertaction(results[0], b'error') | self.assertaction(results[0], b'error') | ||||
self.assertEqual(results[0][1], { | self.assertEqual(results[0][1], { | ||||
b'message': b'request with ID 1 is already active', | b'message': b'request with ID 1 is already active', | ||||
}) | }) | ||||
def testduplicaterequestaftersend(self): | def testduplicaterequestaftersend(self): | ||||
"""We can use a duplicate request ID after we've sent the response.""" | """We can use a duplicate request ID after we've sent the response.""" | ||||
reactor = makereactor() | reactor = makereactor() | ||||
instream = framing.stream(1) | instream = framing.stream(1) | ||||
list(sendcommandframes(reactor, instream, 1, b'command1', {})) | list(sendcommandframes(reactor, instream, 1, b'command1', {})) | ||||
outstream = reactor.makeoutputstream() | outstream = reactor.makeoutputstream() | ||||
res = reactor.oncommandresponseready(outstream, 1, b'response') | res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response']) | ||||
list(res[1][b'framegen']) | list(res[1][b'framegen']) | ||||
results = list(sendcommandframes(reactor, instream, 1, b'command1', {})) | results = list(sendcommandframes(reactor, instream, 1, b'command1', {})) | ||||
self.assertaction(results[0], b'runcommand') | self.assertaction(results[0], b'runcommand') | ||||
def testprotocolsettingsnoflags(self): | def testprotocolsettingsnoflags(self): | ||||
result = self._sendsingleframe( | result = self._sendsingleframe( | ||||
makereactor(), | makereactor(), |