diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py --- a/mercurial/httppeer.py +++ b/mercurial/httppeer.py @@ -515,11 +515,16 @@ # TODO this should be part of a generic peer for the frame-based # protocol. - stream = wireprotoframing.stream(1) - frames = wireprotoframing.createcommandframes(stream, 1, - name, args) + reactor = wireprotoframing.clientreactor(hasmultiplesend=False, + buffersends=True) - body = b''.join(map(bytes, frames)) + request, action, meta = reactor.callcommand(name, args) + assert action == 'noop' + + action, meta = reactor.flushcommands() + assert action == 'sendframes' + + body = b''.join(map(bytes, meta['framegen'])) req = self._requestbuilder(pycompat.strurl(url), body, headers) req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -11,6 +11,7 @@ from __future__ import absolute_import +import collections import struct from .i18n import _ @@ -876,3 +877,133 @@ def _onframeerrored(self, frame): return self._makeerrorresult(_('server already errored')) + +class commandrequest(object): + """Represents a request to run a command.""" + + def __init__(self, requestid, name, args, datafh=None): + self.requestid = requestid + self.name = name + self.args = args + self.datafh = datafh + self.state = 'pending' + +class clientreactor(object): + """Holds state of a client issuing frame-based protocol requests. + + This is like ``serverreactor`` but for client-side state. + + Each instance is bound to the lifetime of a connection. For persistent + connection transports using e.g. TCP sockets and speaking the raw + framing protocol, there will be a single instance for the lifetime of + the TCP socket. For transports where there are multiple discrete + interactions (say tunneled within in HTTP request), there will be a + separate instance for each distinct interaction. + """ + def __init__(self, hasmultiplesend=False, buffersends=True): + """Create a new instance. + + ``hasmultiplesend`` indicates whether multiple sends are supported + by the transport. When True, it is possible to send commands immediately + instead of buffering until the caller signals an intent to finish a + send operation. + + ``buffercommands`` indicates whether sends should be buffered until the + last request has been issued. + """ + self._hasmultiplesend = hasmultiplesend + self._buffersends = buffersends + + self._canissuecommands = True + self._cansend = True + + self._nextrequestid = 1 + # We only support a single outgoing stream for now. + self._outgoingstream = stream(1) + self._pendingrequests = collections.deque() + self._activerequests = {} + + def callcommand(self, name, args, datafh=None): + """Request that a command be executed. + + Receives the command name, a dict of arguments to pass to the command, + and an optional file object containing the raw data for the command. + + Returns a 3-tuple of (request, action, action data). + """ + if not self._canissuecommands: + raise error.ProgrammingError('cannot issue new commands') + + requestid = self._nextrequestid + self._nextrequestid += 2 + + request = commandrequest(requestid, name, args, datafh=datafh) + + if self._buffersends: + self._pendingrequests.append(request) + return request, 'noop', {} + else: + if not self._cansend: + raise error.ProgrammingError('sends cannot be performed on ' + 'this instance') + + if not self._hasmultiplesend: + self._cansend = False + self._canissuecommands = False + + return request, 'sendframes', { + 'framegen': self._makecommandframes(request), + } + + def flushcommands(self): + """Request that all queued commands be sent. + + If any commands are buffered, this will instruct the caller to send + them over the wire. If no commands are buffered it instructs the client + to no-op. + + If instances aren't configured for multiple sends, no new command + requests are allowed after this is called. + """ + if not self._pendingrequests: + return 'noop', {} + + if not self._cansend: + raise error.ProgrammingError('sends cannot be performed on this ' + 'instance') + + # If the instance only allows sending once, mark that we have fired + # our one shot. + if not self._hasmultiplesend: + self._canissuecommands = False + self._cansend = False + + def makeframes(): + while self._pendingrequests: + request = self._pendingrequests.popleft() + for frame in self._makecommandframes(request): + yield frame + + return 'sendframes', { + 'framegen': makeframes(), + } + + def _makecommandframes(self, request): + """Emit frames to issue a command request. + + As a side-effect, update request accounting to reflect its changed + state. + """ + self._activerequests[request.requestid] = request + request.state = 'sending' + + res = createcommandframes(self._outgoingstream, + request.requestid, + request.name, + request.args, + request.datafh) + + for frame in res: + yield frame + + request.state = 'sent' diff --git a/tests/test-wireproto-clientreactor.py b/tests/test-wireproto-clientreactor.py new file mode 100644 --- /dev/null +++ b/tests/test-wireproto-clientreactor.py @@ -0,0 +1,66 @@ +from __future__ import absolute_import + +import unittest + +from mercurial import ( + error, + wireprotoframing as framing, +) + +class SingleSendTests(unittest.TestCase): + """A reactor that can only send once rejects subsequent sends.""" + def testbasic(self): + reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True) + + request, action, meta = reactor.callcommand(b'foo', {}) + self.assertEqual(request.state, 'pending') + self.assertEqual(action, 'noop') + + action, meta = reactor.flushcommands() + self.assertEqual(action, 'sendframes') + + for frame in meta['framegen']: + self.assertEqual(request.state, 'sending') + + self.assertEqual(request.state, 'sent') + + with self.assertRaisesRegexp(error.ProgrammingError, + 'cannot issue new commands'): + reactor.callcommand(b'foo', {}) + + with self.assertRaisesRegexp(error.ProgrammingError, + 'cannot issue new commands'): + reactor.callcommand(b'foo', {}) + +class NoBufferTests(unittest.TestCase): + """A reactor without send buffering sends requests immediately.""" + def testbasic(self): + reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False) + + request, action, meta = reactor.callcommand(b'command1', {}) + self.assertEqual(request.requestid, 1) + self.assertEqual(action, 'sendframes') + + self.assertEqual(request.state, 'pending') + + for frame in meta['framegen']: + self.assertEqual(request.state, 'sending') + + self.assertEqual(request.state, 'sent') + + action, meta = reactor.flushcommands() + self.assertEqual(action, 'noop') + + # And we can send another command. + request, action, meta = reactor.callcommand(b'command2', {}) + self.assertEqual(request.requestid, 3) + self.assertEqual(action, 'sendframes') + + for frame in meta['framegen']: + self.assertEqual(request.state, 'sending') + + self.assertEqual(request.state, 'sent') + +if __name__ == '__main__': + import silenttestrunner + silenttestrunner.main(__name__)