Details
Details
- Reviewers
- None
- Group Reviewers
hg-reviewers - Commits
- rHGed4ebbb98ca0: wireprotov2: raise exception in objects() if future has been resolved
Diff Detail
Diff Detail
- Repository
- rHG Mercurial
- Lint
Lint Skipped - Unit
Unit Tests Skipped
hg-reviewers |
Lint Skipped |
Unit Tests Skipped |
Path | Packages | |||
---|---|---|---|---|
M | mercurial/wireprotov2peer.py (25 lines) |
Commit | Parents | Author | Summary | Date |
---|---|---|---|---|
Gregory Szorc | Oct 8 2018, 6:19 PM |
# sending us new data and another consuming it. | # sending us new data and another consuming it. | ||||
self._lock = threading.RLock() | self._lock = threading.RLock() | ||||
# An event is set when state of the object changes. This event | # An event is set when state of the object changes. This event | ||||
# is waited on by the generator emitting objects. | # is waited on by the generator emitting objects. | ||||
self._serviceable = threading.Event() | self._serviceable = threading.Event() | ||||
self._pendingevents = [] | self._pendingevents = [] | ||||
self._pendingerror = None | |||||
self._decoder = cborutil.bufferingdecoder() | self._decoder = cborutil.bufferingdecoder() | ||||
self._seeninitial = False | self._seeninitial = False | ||||
self._redirect = None | self._redirect = None | ||||
def _oninputcomplete(self): | def _oninputcomplete(self): | ||||
with self._lock: | with self._lock: | ||||
self._inputcomplete = True | self._inputcomplete = True | ||||
self._serviceable.set() | self._serviceable.set() | ||||
raise error.Abort(_('received unexpected response data ' | raise error.Abort(_('received unexpected response data ' | ||||
'after content redirect; the remote is ' | 'after content redirect; the remote is ' | ||||
'buggy')) | 'buggy')) | ||||
self._pendingevents.append(o) | self._pendingevents.append(o) | ||||
self._serviceable.set() | self._serviceable.set() | ||||
def _onerror(self, e): | |||||
self._pendingerror = e | |||||
with self._lock: | |||||
self._serviceable.set() | |||||
def _handleinitial(self, o): | def _handleinitial(self, o): | ||||
self._seeninitial = True | self._seeninitial = True | ||||
if o[b'status'] == b'ok': | if o[b'status'] == b'ok': | ||||
return | return | ||||
elif o[b'status'] == b'redirect': | elif o[b'status'] == b'redirect': | ||||
l = o[b'location'] | l = o[b'location'] | ||||
self._redirect = wireprototypes.alternatelocationresponse( | self._redirect = wireprototypes.alternatelocationresponse( | ||||
""" | """ | ||||
while True: | while True: | ||||
# TODO this can infinite loop if self._inputcomplete is never | # TODO this can infinite loop if self._inputcomplete is never | ||||
# set. We likely want to tie the lifetime of this object/state | # set. We likely want to tie the lifetime of this object/state | ||||
# to that of the background thread receiving frames and updating | # to that of the background thread receiving frames and updating | ||||
# our state. | # our state. | ||||
self._serviceable.wait(1.0) | self._serviceable.wait(1.0) | ||||
if self._pendingerror: | |||||
raise self._pendingerror | |||||
with self._lock: | with self._lock: | ||||
self._serviceable.clear() | self._serviceable.clear() | ||||
# Make copies because objects could be mutated during | # Make copies because objects could be mutated during | ||||
# iteration. | # iteration. | ||||
stop = self._inputcomplete | stop = self._inputcomplete | ||||
pending = list(self._pendingevents) | pending = list(self._pendingevents) | ||||
self._pendingevents[:] = [] | self._pendingevents[:] = [] | ||||
response = self._responses[frame.requestid] | response = self._responses[frame.requestid] | ||||
if action == 'responsedata': | if action == 'responsedata': | ||||
# Any failures processing this frame should bubble up to the | # Any failures processing this frame should bubble up to the | ||||
# future tracking the request. | # future tracking the request. | ||||
try: | try: | ||||
self._processresponsedata(frame, meta, response) | self._processresponsedata(frame, meta, response) | ||||
except BaseException as e: | except BaseException as e: | ||||
# If an exception occurs before the future is resolved, | |||||
# fail the future. Otherwise, we stuff the exception on | |||||
# the response object so it can be raised during objects() | |||||
# iteration. If nothing is consuming objects(), we could | |||||
# silently swallow this exception. That's a risk we'll have to | |||||
# take. | |||||
if frame.requestid in self._futures: | |||||
self._futures[frame.requestid].set_exception(e) | self._futures[frame.requestid].set_exception(e) | ||||
del self._futures[frame.requestid] | del self._futures[frame.requestid] | ||||
response._oninputcomplete() | response._oninputcomplete() | ||||
else: | else: | ||||
response._onerror(e) | |||||
else: | |||||
raise error.ProgrammingError( | raise error.ProgrammingError( | ||||
'unhandled action from clientreactor: %s' % action) | 'unhandled action from clientreactor: %s' % action) | ||||
def _processresponsedata(self, frame, meta, response): | def _processresponsedata(self, frame, meta, response): | ||||
# This can raise. The caller can handle it. | # This can raise. The caller can handle it. | ||||
response._onresponsedata(meta['data']) | response._onresponsedata(meta['data']) | ||||
# If we got a content redirect response, we want to fetch it and | # If we got a content redirect response, we want to fetch it and |