diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -511,6 +511,98 @@ flags=0, payload=payload) +class bufferingcommandresponseemitter(object): + """Helper object to emit command response frames intelligently. + + Raw command response data is likely emitted in chunks much smaller + than what can fit in a single frame. This class exists to buffer + chunks until enough data is available to fit in a single frame. + + TODO we'll need something like this when compression is supported. + So it might make sense to implement this functionality at the stream + level. + """ + def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE): + self._stream = stream + self._requestid = requestid + self._maxsize = maxframesize + self._chunks = [] + self._chunkssize = 0 + + def send(self, data): + """Send new data for emission. + + Is a generator of new frames that were derived from the new input. + + If the special input ``None`` is received, flushes all buffered + data to frames. + """ + + if data is None: + for frame in self._flush(): + yield frame + return + + # There is a ton of potential to do more complicated things here. + # Our immediate goal is to coalesce small chunks into big frames, + # not achieve the fewest number of frames possible. So we go with + # a simple implementation: + # + # * If a chunk is too large for a frame, we flush and emit frames + # for the new chunk. + # * If a chunk can be buffered without total buffered size limits + # being exceeded, we do that. + # * If a chunk causes us to go over our buffering limit, we flush + # and then buffer the new chunk. + + if len(data) > self._maxsize: + for frame in self._flush(): + yield frame + + # Now emit frames for the big chunk. + offset = 0 + while True: + chunk = data[offset:offset + self._maxsize] + offset += len(chunk) + + yield self._stream.makeframe( + self._requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_CONTINUATION, + payload=chunk) + + if offset == len(data): + return + + # If we don't have enough to constitute a full frame, buffer and + # return. + if len(data) + self._chunkssize < self._maxsize: + self._chunks.append(data) + self._chunkssize += len(data) + return + + # Else flush what we have and buffer the new chunk. We could do + # something more intelligent here, like break the chunk. Let's + # keep things simple for now. + for frame in self._flush(): + yield frame + + self._chunks.append(data) + self._chunkssize = len(data) + + def _flush(self): + payload = b''.join(self._chunks) + assert len(payload) <= self._maxsize + + self._chunks[:] = [] + self._chunkssize = 0 + + yield self._stream.makeframe( + self._requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_CONTINUATION, + payload=payload) + class stream(object): """Represents a logical unidirectional series of frames.""" @@ -716,10 +808,14 @@ def sendframes(): emitted = False + emitter = bufferingcommandresponseemitter(stream, requestid) while True: try: o = next(objs) except StopIteration: + for frame in emitter.send(None): + yield frame + if emitted: yield createcommandresponseeosframe(stream, requestid) break @@ -743,11 +839,10 @@ yield createcommandresponseokframe(stream, requestid) emitted = True - # TODO buffer chunks so emitted frame payloads can be - # larger. - for frame in createbytesresponseframesfromgen( - stream, requestid, cborutil.streamencode(o)): - yield frame + for chunk in cborutil.streamencode(o): + for frame in emitter.send(chunk): + yield frame + except Exception as e: for frame in createerrorframe(stream, requestid, '%s' % e,