diff --git a/mercurial/configitems.py b/mercurial/configitems.py --- a/mercurial/configitems.py +++ b/mercurial/configitems.py @@ -583,6 +583,9 @@ coreconfigitem('experimental', 'web.api.http-v2', default=False, ) +coreconfigitem('experimental', 'web.api.debugreflect', + default=False, +) coreconfigitem('experimental', 'xdiff', default=False, ) diff --git a/mercurial/util.py b/mercurial/util.py --- a/mercurial/util.py +++ b/mercurial/util.py @@ -2561,6 +2561,14 @@ return data + def readinto(self, b): + res = self.read(len(b)) + if res is None: + return None + + b[0:len(res)] = res + return len(res) + def stringmatcher(pattern, casesensitive=True): """ accepts a string, possibly starting with 're:' or 'literal:' prefix. diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -13,7 +13,9 @@ import struct +from .i18n import _ from . import ( + error, util, ) @@ -105,6 +107,51 @@ return makeframe(frametype, finalflags, payload) +def parseheader(data): + """Parse a unified framing protocol frame header from a buffer. + + The header is expected to be in the buffer at offset 0 and the + buffer is expected to be large enough to hold a full header. + """ + # 24 bits payload length (little endian) + # 4 bits frame type + # 4 bits frame flags + # ... payload + framelength = data[0] + 256 * data[1] + 16384 * data[2] + typeflags = data[3] + + frametype = (typeflags & 0xf0) >> 4 + frameflags = typeflags & 0x0f + + return frametype, frameflags, framelength + +def readframe(fh): + """Read a unified framing protocol frame from a file object. + + Returns a 3-tuple of (type, flags, payload) for the decoded frame or + None if no frame is available. May raise if a malformed frame is + seen. + """ + header = bytearray(FRAME_HEADER_SIZE) + + readcount = fh.readinto(header) + + if readcount == 0: + return None + + if readcount != FRAME_HEADER_SIZE: + raise error.Abort(_('received incomplete frame: got %d bytes: %s') % + (readcount, header)) + + frametype, frameflags, framelength = parseheader(header) + + payload = fh.read(framelength) + if len(payload) != framelength: + raise error.Abort(_('frame length error: expected %d; got %d') % + (framelength, len(payload))) + + return frametype, frameflags, payload + def createcommandframes(cmd, args, datafh=None): """Create frames necessary to transmit a request to run a command. @@ -154,3 +201,195 @@ if done: break + +class serverreactor(object): + """Holds state of a server handling frame-based protocol requests. + + This class is the "brain" of the unified frame-based protocol server + component. While the protocol is stateless from the perspective of + requests/commands, something needs to track which frames have been + received, what frames to expect, etc. This class is that thing. + + Instances are modeled as a state machine of sorts. Instances are also + reactionary to external events. The point of this class is to encapsulate + the state of the connection and the exchange of frames, not to perform + work. Instead, callers tell this class when something occurs, like a + frame arriving. If that activity is worthy of a follow-up action (say + *run a command*), the return value of that handler will say so. + + I/O and CPU intensive operations are purposefully delegated outside of + this class. + + Consumers are expected to tell instances when events occur. They do so by + calling the various ``on*`` methods. These methods return a 2-tuple + describing any follow-up action(s) to take. The first element is the + name of an action to perform. The second is a data structure (usually + a dict) specific to that action that contains more information. e.g. + if the server wants to send frames back to the client, the data structure + will contain a reference to those frames. + + Valid actions that consumers can be instructed to take are: + + error + Indicates that an error occurred. Consumer should probably abort. + + runcommand + Indicates that the consumer should run a wire protocol command. Details + of the command to run are given in the data structure. + + wantframe + Indicates that nothing of interest happened and the server is waiting on + more frames from the client before anything interesting can be done. + """ + + def __init__(self): + self._state = 'idle' + self._activecommand = None + self._activeargs = None + self._activedata = None + self._expectingargs = None + self._expectingdata = None + self._activeargname = None + self._activeargchunks = None + + def onframerecv(self, frametype, frameflags, payload): + """Process a frame that has been received off the wire. + + Returns a dict with an ``action`` key that details what action, + if any, the consumer should take next. + """ + handlers = { + 'idle': self._onframeidle, + 'command-receiving-args': self._onframereceivingargs, + 'command-receiving-data': self._onframereceivingdata, + 'errored': self._onframeerrored, + } + + meth = handlers.get(self._state) + if not meth: + raise error.ProgrammingError('unhandled state: %s' % self._state) + + return meth(frametype, frameflags, payload) + + def _makeerrorresult(self, msg): + return 'error', { + 'message': msg, + } + + def _makeruncommandresult(self): + return 'runcommand', { + 'command': self._activecommand, + 'args': self._activeargs, + 'data': self._activedata.getvalue() if self._activedata else None, + } + + def _makewantframeresult(self): + return 'wantframe', { + 'state': self._state, + } + + def _onframeidle(self, frametype, frameflags, payload): + # The only frame type that should be received in this state is a + # command request. + if frametype != FRAME_TYPE_COMMAND_NAME: + self._state = 'errored' + return self._makeerrorresult( + _('expected command frame; got %d') % frametype) + + self._activecommand = payload + self._activeargs = {} + self._activedata = None + + if frameflags & FLAG_COMMAND_NAME_EOS: + return self._makeruncommandresult() + + self._expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS) + self._expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA) + + if self._expectingargs: + self._state = 'command-receiving-args' + return self._makewantframeresult() + elif self._expectingdata: + self._activedata = util.bytesio() + self._state = 'command-receiving-data' + return self._makewantframeresult() + else: + self._state = 'errored' + return self._makeerrorresult(_('missing frame flags on ' + 'command frame')) + + def _onframereceivingargs(self, frametype, frameflags, payload): + if frametype != FRAME_TYPE_COMMAND_ARGUMENT: + self._state = 'errored' + return self._makeerrorresult(_('expected command argument ' + 'frame; got %d') % frametype) + + offset = 0 + namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload) + offset += ARGUMENT_FRAME_HEADER.size + + # The argument name MUST fit inside the frame. + argname = bytes(payload[offset:offset + namesize]) + offset += namesize + + if len(argname) != namesize: + self._state = 'errored' + return self._makeerrorresult(_('malformed argument frame: ' + 'partial argument name')) + + argvalue = bytes(payload[offset:]) + + # Argument value spans multiple frames. Record our active state + # and wait for the next frame. + if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION: + raise error.ProgrammingError('not yet implemented') + self._activeargname = argname + self._activeargchunks = [argvalue] + self._state = 'command-arg-continuation' + return self._makewantframeresult() + + # Common case: the argument value is completely contained in this + # frame. + + if len(argvalue) != valuesize: + self._state = 'errored' + return self._makeerrorresult(_('malformed argument frame: ' + 'partial argument value')) + + self._activeargs[argname] = argvalue + + if frameflags & FLAG_COMMAND_ARGUMENT_EOA: + if self._expectingdata: + self._state = 'command-receiving-data' + self._activedata = util.bytesio() + # TODO signal request to run a command once we don't + # buffer data frames. + return self._makewantframeresult() + else: + self._state = 'waiting' + return self._makeruncommandresult() + else: + return self._makewantframeresult() + + def _onframereceivingdata(self, frametype, frameflags, payload): + if frametype != FRAME_TYPE_COMMAND_DATA: + self._state = 'errored' + return self._makeerrorresult(_('expected command data frame; ' + 'got %d') % frametype) + + # TODO support streaming data instead of buffering it. + self._activedata.write(payload) + + if frameflags & FLAG_COMMAND_DATA_CONTINUATION: + return self._makewantframeresult() + elif frameflags & FLAG_COMMAND_DATA_EOS: + self._activedata.seek(0) + self._state = 'idle' + return self._makeruncommandresult() + else: + self._state = 'errored' + return self._makeerrorresult(_('command data frame without ' + 'flags')) + + def _onframeerrored(self, frametype, frameflags, payload): + return self._makeerrorresult(_('server already errored')) diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py --- a/mercurial/wireprotoserver.py +++ b/mercurial/wireprotoserver.py @@ -19,6 +19,7 @@ pycompat, util, wireproto, + wireprotoframing, wireprototypes, ) @@ -319,6 +320,11 @@ res.setbodybytes('permission denied') return + # We have a special endpoint to reflect the request back at the client. + if command == b'debugreflect': + _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res) + return + if command not in wireproto.commands: res.status = b'404 Not Found' res.headers[b'Content-Type'] = b'text/plain' @@ -343,8 +349,7 @@ % FRAMINGTYPE) return - if (b'Content-Type' in req.headers - and req.headers[b'Content-Type'] != FRAMINGTYPE): + if req.headers.get(b'Content-Type') != FRAMINGTYPE: res.status = b'415 Unsupported Media Type' # TODO we should send a response with appropriate media type, # since client does Accept it. @@ -358,6 +363,49 @@ res.headers[b'Content-Type'] = b'text/plain' res.setbodybytes(b'/'.join(urlparts) + b'\n') +def _processhttpv2reflectrequest(ui, repo, req, res): + """Reads unified frame protocol request and dumps out state to client. + + This special endpoint can be used to help debug the wire protocol. + + Instead of routing the request through the normal dispatch mechanism, + we instead read all frames, decode them, and feed them into our state + tracker. We then dump the log of all that activity back out to the + client. + """ + import json + + # Reflection APIs have a history of being abused, accidentally disclosing + # sensitive data, etc. So we have a config knob. + if not ui.configbool('experimental', 'web.api.debugreflect'): + res.status = b'404 Not Found' + res.headers[b'Content-Type'] = b'text/plain' + res.setbodybytes(_('debugreflect service not available')) + return + + # We assume we have a unified framing protocol request body. + + reactor = wireprotoframing.serverreactor() + states = [] + + while True: + frame = wireprotoframing.readframe(req.bodyfh) + + if not frame: + states.append(b'received: ') + break + + frametype, frameflags, payload = frame + states.append(b'received: %d %d %s' % (frametype, frameflags, payload)) + + action, meta = reactor.onframerecv(frametype, frameflags, payload) + states.append(json.dumps((action, meta), sort_keys=True, + separators=(', ', ': '))) + + res.status = b'200 OK' + res.headers[b'Content-Type'] = b'text/plain' + res.setbodybytes(b'\n'.join(states)) + # Maps API name to metadata so custom API can be registered. API_HANDLERS = { HTTPV2: { diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t --- a/tests/test-http-api-httpv2.t +++ b/tests/test-http-api-httpv2.t @@ -261,7 +261,7 @@ > allow-push = * > EOF - $ hg -R server serve -p $HGPORT -d --pid-file hg.pid + $ hg -R server serve -p $HGPORT -d --pid-file hg.pid -E error.log $ cat hg.pid > $DAEMON_PIDS Authorized request for valid read-write command works @@ -314,3 +314,78 @@ s> Content-Length: 42\r\n s> \r\n s> unknown wire protocol command: badcommand\n + +debugreflect isn't enabled by default + + $ send << EOF + > httprequest POST api/$HTTPV2/ro/debugreflect + > user-agent: test + > EOF + using raw connection to peer + s> POST /api/exp-http-v2-0001/ro/debugreflect HTTP/1.1\r\n + s> Accept-Encoding: identity\r\n + s> user-agent: test\r\n + s> host: $LOCALIP:$HGPORT\r\n (glob) + s> \r\n + s> makefile('rb', None) + s> HTTP/1.1 404 Not Found\r\n + s> Server: testing stub value\r\n + s> Date: $HTTP_DATE$\r\n + s> Content-Type: text/plain\r\n + s> Content-Length: 34\r\n + s> \r\n + s> debugreflect service not available + +Restart server to get debugreflect endpoint + + $ killdaemons.py + $ cat > server/.hg/hgrc << EOF + > [experimental] + > web.apiserver = true + > web.api.debugreflect = true + > web.api.http-v2 = true + > [web] + > push_ssl = false + > allow-push = * + > EOF + + $ hg -R server serve -p $HGPORT -d --pid-file hg.pid -E error.log + $ cat hg.pid > $DAEMON_PIDS + +Command frames can be reflected via debugreflect + + $ send << EOF + > httprequest POST api/$HTTPV2/ro/debugreflect + > accept: $MEDIATYPE + > content-type: $MEDIATYPE + > user-agent: test + > frame command-name have-args command1 + > frame command-argument 0 \x03\x00\x04\x00fooval1 + > frame command-argument eoa \x04\x00\x03\x00bar1val + > EOF + using raw connection to peer + s> POST /api/exp-http-v2-0001/ro/debugreflect HTTP/1.1\r\n + s> Accept-Encoding: identity\r\n + s> accept: application/mercurial-exp-framing-0001\r\n + s> content-type: application/mercurial-exp-framing-0001\r\n + s> user-agent: test\r\n + s> content-length: 42\r\n + s> host: $LOCALIP:$HGPORT\r\n (glob) + s> \r\n + s> \x08\x00\x00\x12command1\x0b\x00\x00 \x03\x00\x04\x00fooval1\x0b\x00\x00"\x04\x00\x03\x00bar1val + s> makefile('rb', None) + s> HTTP/1.1 200 OK\r\n + s> Server: testing stub value\r\n + s> Date: $HTTP_DATE$\r\n + s> Content-Type: text/plain\r\n + s> Content-Length: 291\r\n + s> \r\n + s> received: 1 2 command1\n + s> ["wantframe", {"state": "command-receiving-args"}]\n + s> received: 2 0 \x03\x00\x04\x00fooval1\n + s> ["wantframe", {"state": "command-receiving-args"}]\n + s> received: 2 2 \x04\x00\x03\x00bar1val\n + s> ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null}]\n + s> received: + + $ cat error.log diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py new file mode 100644 --- /dev/null +++ b/tests/test-wireproto-serverreactor.py @@ -0,0 +1,275 @@ +from __future__ import absolute_import, print_function + +import unittest + +from mercurial import ( + util, + wireprotoframing as framing, +) + +ffs = framing.makeframefromhumanstring + +def makereactor(): + return framing.serverreactor() + +def sendframes(reactor, gen): + """Send a generator of frame bytearray to a reactor. + + Emits a generator of results from ``onframerecv()`` calls. + """ + for frame in gen: + frametype, frameflags, framelength = framing.parseheader(frame) + payload = frame[framing.FRAME_HEADER_SIZE:] + assert len(payload) == framelength + + yield reactor.onframerecv(frametype, frameflags, payload) + +def sendcommandframes(reactor, cmd, args, datafh=None): + """Generate frames to run a command and send them to a reactor.""" + return sendframes(reactor, framing.createcommandframes(cmd, args, datafh)) + +class FrameTests(unittest.TestCase): + def testdataexactframesize(self): + data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE) + + frames = list(framing.createcommandframes(b'command', {}, data)) + self.assertEqual(frames, [ + ffs(b'command-name have-data command'), + ffs(b'command-data continuation %s' % data.getvalue()), + ffs(b'command-data eos ') + ]) + + def testdatamultipleframes(self): + data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1)) + frames = list(framing.createcommandframes(b'command', {}, data)) + self.assertEqual(frames, [ + ffs(b'command-name have-data command'), + ffs(b'command-data continuation %s' % ( + b'x' * framing.DEFAULT_MAX_FRAME_SIZE)), + ffs(b'command-data eos x'), + ]) + + def testargsanddata(self): + data = util.bytesio(b'x' * 100) + + frames = list(framing.createcommandframes(b'command', { + b'key1': b'key1value', + b'key2': b'key2value', + b'key3': b'key3value', + }, data)) + + self.assertEqual(frames, [ + ffs(b'command-name have-args|have-data command'), + ffs(br'command-argument 0 \x04\x00\x09\x00key1key1value'), + ffs(br'command-argument 0 \x04\x00\x09\x00key2key2value'), + ffs(br'command-argument eoa \x04\x00\x09\x00key3key3value'), + ffs(b'command-data eos %s' % data.getvalue()), + ]) + +class ServerReactorTests(unittest.TestCase): + def _sendsingleframe(self, reactor, s): + results = list(sendframes(reactor, [ffs(s)])) + self.assertEqual(len(results), 1) + + return results[0] + + def assertaction(self, res, expected): + self.assertIsInstance(res, tuple) + self.assertEqual(len(res), 2) + self.assertIsInstance(res[1], dict) + self.assertEqual(res[0], expected) + + def test1framecommand(self): + """Receiving a command in a single frame yields request to run it.""" + reactor = makereactor() + results = list(sendcommandframes(reactor, b'mycommand', {})) + self.assertEqual(len(results), 1) + self.assertaction(results[0], 'runcommand') + self.assertEqual(results[0][1], { + 'command': b'mycommand', + 'args': {}, + 'data': None, + }) + + def test1argument(self): + reactor = makereactor() + results = list(sendcommandframes(reactor, b'mycommand', + {b'foo': b'bar'})) + self.assertEqual(len(results), 2) + self.assertaction(results[0], 'wantframe') + self.assertaction(results[1], 'runcommand') + self.assertEqual(results[1][1], { + 'command': b'mycommand', + 'args': {b'foo': b'bar'}, + 'data': None, + }) + + def testmultiarguments(self): + reactor = makereactor() + results = list(sendcommandframes(reactor, b'mycommand', + {b'foo': b'bar', b'biz': b'baz'})) + self.assertEqual(len(results), 3) + self.assertaction(results[0], 'wantframe') + self.assertaction(results[1], 'wantframe') + self.assertaction(results[2], 'runcommand') + self.assertEqual(results[2][1], { + 'command': b'mycommand', + 'args': {b'foo': b'bar', b'biz': b'baz'}, + 'data': None, + }) + + def testsimplecommanddata(self): + reactor = makereactor() + results = list(sendcommandframes(reactor, b'mycommand', {}, + util.bytesio(b'data!'))) + self.assertEqual(len(results), 2) + self.assertaction(results[0], 'wantframe') + self.assertaction(results[1], 'runcommand') + self.assertEqual(results[1][1], { + 'command': b'mycommand', + 'args': {}, + 'data': b'data!', + }) + + def testmultipledataframes(self): + frames = [ + ffs(b'command-name have-data mycommand'), + ffs(b'command-data continuation data1'), + ffs(b'command-data continuation data2'), + ffs(b'command-data eos data3'), + ] + + reactor = makereactor() + results = list(sendframes(reactor, frames)) + self.assertEqual(len(results), 4) + for i in range(3): + self.assertaction(results[i], 'wantframe') + self.assertaction(results[3], 'runcommand') + self.assertEqual(results[3][1], { + 'command': b'mycommand', + 'args': {}, + 'data': b'data1data2data3', + }) + + def testargumentanddata(self): + frames = [ + ffs(b'command-name have-args|have-data command'), + ffs(br'command-argument 0 \x03\x00\x03\x00keyval'), + ffs(br'command-argument eoa \x03\x00\x03\x00foobar'), + ffs(b'command-data continuation value1'), + ffs(b'command-data eos value2'), + ] + + reactor = makereactor() + results = list(sendframes(reactor, frames)) + + self.assertaction(results[-1], 'runcommand') + self.assertEqual(results[-1][1], { + 'command': b'command', + 'args': { + b'key': b'val', + b'foo': b'bar', + }, + 'data': b'value1value2', + }) + + def testunexpectedcommandargument(self): + """Command argument frame when not running a command is an error.""" + result = self._sendsingleframe(makereactor(), + b'command-argument 0 ignored') + self.assertaction(result, 'error') + self.assertEqual(result[1], { + 'message': b'expected command frame; got 2', + }) + + def testunexpectedcommanddata(self): + """Command argument frame when not running a command is an error.""" + result = self._sendsingleframe(makereactor(), + b'command-data 0 ignored') + self.assertaction(result, 'error') + self.assertEqual(result[1], { + 'message': b'expected command frame; got 3', + }) + + def testmissingcommandframeflags(self): + """Command name frame must have flags set.""" + result = self._sendsingleframe(makereactor(), + b'command-name 0 command') + self.assertaction(result, 'error') + self.assertEqual(result[1], { + 'message': b'missing frame flags on command frame', + }) + + def testmissingargumentframe(self): + frames = [ + ffs(b'command-name have-args command'), + ffs(b'command-name 0 ignored'), + ] + + results = list(sendframes(makereactor(), frames)) + self.assertEqual(len(results), 2) + self.assertaction(results[0], 'wantframe') + self.assertaction(results[1], 'error') + self.assertEqual(results[1][1], { + 'message': b'expected command argument frame; got 1', + }) + + def testincompleteargumentname(self): + """Argument frame with incomplete name.""" + frames = [ + ffs(b'command-name have-args command1'), + ffs(br'command-argument eoa \x04\x00\xde\xadfoo'), + ] + + results = list(sendframes(makereactor(), frames)) + self.assertEqual(len(results), 2) + self.assertaction(results[0], 'wantframe') + self.assertaction(results[1], 'error') + self.assertEqual(results[1][1], { + 'message': b'malformed argument frame: partial argument name', + }) + + def testincompleteargumentvalue(self): + """Argument frame with incomplete value.""" + frames = [ + ffs(b'command-name have-args command'), + ffs(br'command-argument eoa \x03\x00\xaa\xaafoopartialvalue'), + ] + + results = list(sendframes(makereactor(), frames)) + self.assertEqual(len(results), 2) + self.assertaction(results[0], 'wantframe') + self.assertaction(results[1], 'error') + self.assertEqual(results[1][1], { + 'message': b'malformed argument frame: partial argument value', + }) + + def testmissingcommanddataframe(self): + frames = [ + ffs(b'command-name have-data command1'), + ffs(b'command-name eos command2'), + ] + results = list(sendframes(makereactor(), frames)) + self.assertEqual(len(results), 2) + self.assertaction(results[0], 'wantframe') + self.assertaction(results[1], 'error') + self.assertEqual(results[1][1], { + 'message': b'expected command data frame; got 1', + }) + + def testmissingcommanddataframeflags(self): + frames = [ + ffs(b'command-name have-data command1'), + ffs(b'command-data 0 data'), + ] + results = list(sendframes(makereactor(), frames)) + self.assertEqual(len(results), 2) + self.assertaction(results[0], 'wantframe') + self.assertaction(results[1], 'error') + self.assertEqual(results[1][1], { + 'message': b'command data frame without flags', + }) + +if __name__ == '__main__': + import silenttestrunner + silenttestrunner.main(__name__)